diff --git a/badger/cmd/bank.go b/badger/cmd/bank.go index d7853a550..bdac0eeee 100644 --- a/badger/cmd/bank.go +++ b/badger/cmd/bank.go @@ -357,14 +357,15 @@ func runTest(cmd *cobra.Command, args []string) error { // Open DB opts := badger.DefaultOptions(sstDir). WithValueDir(vlogDir). - WithMaxTableSize(4 << 20). // Force more compactions. + WithBaseTableSize(4 << 20). // Force more compactions. WithNumLevelZeroTables(2). WithNumMemtables(2). // Do not GC any versions, because we need them for the disect.. WithNumVersionsToKeep(int(math.MaxInt32)). WithValueThreshold(1). // Make all values go to value log WithCompression(options.ZSTD). - WithBlockCacheSize(10 << 20) + WithBlockCacheSize(10 << 20). + WithIndexCacheSize(10 << 20) if verbose { opts = opts.WithLoggingLevel(badger.DEBUG) diff --git a/badger/cmd/write_bench.go b/badger/cmd/write_bench.go index 91df956cc..782853cb1 100644 --- a/badger/cmd/write_bench.go +++ b/badger/cmd/write_bench.go @@ -66,7 +66,7 @@ var ( vlogMaxEntries uint32 loadBloomsOnOpen bool detectConflicts bool - compression bool + zstdComp bool showDir bool ttlDuration string showKeysCount bool @@ -113,8 +113,8 @@ func init() { "Load Bloom filter on DB open.") writeBenchCmd.Flags().BoolVar(&detectConflicts, "conficts", false, "If true, it badger will detect the conflicts") - writeBenchCmd.Flags().BoolVar(&compression, "compression", true, - "If true, badger will use ZSTD mode") + writeBenchCmd.Flags().BoolVar(&zstdComp, "zstd", false, + "If true, badger will use ZSTD mode. Otherwise, use default.") writeBenchCmd.Flags().BoolVar(&showDir, "show-dir", false, "If true, the report will include the directory contents") writeBenchCmd.Flags().StringVar(&dropAllPeriod, "dropall", "0s", @@ -260,12 +260,6 @@ func writeSorted(db *badger.DB, num uint64) error { } func writeBench(cmd *cobra.Command, args []string) error { - var cmode options.CompressionType - if compression { - cmode = options.ZSTD - } else { - cmode = options.None - } opt := badger.DefaultOptions(sstDir). WithValueDir(vlogDir). WithSyncWrites(syncWrites). @@ -277,8 +271,10 @@ func writeBench(cmd *cobra.Command, args []string) error { WithValueLogMaxEntries(vlogMaxEntries). WithEncryptionKey([]byte(encryptionKey)). WithDetectConflicts(detectConflicts). - WithCompression(cmode). WithLoggingLevel(badger.INFO) + if zstdComp { + opt = opt.WithCompression(options.ZSTD) + } if !showLogs { opt = opt.WithLogger(nil) @@ -314,6 +310,7 @@ func writeBench(cmd *cobra.Command, args []string) error { } c.SignalAndWait() + fmt.Printf(db.LevelsToString()) return err } @@ -354,11 +351,13 @@ func reportStats(c *z.Closer, db *badger.DB) { t := time.NewTicker(time.Second) defer t.Stop() + var count int for { select { case <-c.HasBeenClosed(): return case <-t.C: + count++ if showKeysCount { showKeysStats(db) } @@ -392,8 +391,13 @@ func reportStats(c *z.Closer, db *badger.DB) { bytesRate := sz / uint64(dur.Seconds()) entriesRate := entries / uint64(dur.Seconds()) fmt.Printf("[WRITE] Time elapsed: %s, bytes written: %s, speed: %s/sec, "+ - "entries written: %d, speed: %d/sec, gcSuccess: %d\n", y.FixedDuration(time.Since(startTime)), - humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate, gcSuccess) + "entries written: %d, speed: %d/sec, Memory: %s\n", + y.FixedDuration(time.Since(startTime)), + humanize.Bytes(sz), humanize.Bytes(bytesRate), entries, entriesRate, + humanize.IBytes(uint64(z.NumAllocBytes()))) + if count%10 == 0 { + fmt.Printf(db.LevelsToString()) + } } } } diff --git a/batch.go b/batch.go index 60fa2c0cf..c9bd838b7 100644 --- a/batch.go +++ b/batch.go @@ -18,6 +18,7 @@ package badger import ( "sync" + "sync/atomic" "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/badger/v2/y" @@ -30,7 +31,7 @@ type WriteBatch struct { txn *Txn db *DB throttle *y.Throttle - err error + err atomic.Value isManaged bool commitTs uint64 @@ -82,18 +83,16 @@ func (wb *WriteBatch) Cancel() { wb.txn.Discard() } -// The caller of this callback must hold the lock. func (wb *WriteBatch) callback(err error) { // sync.WaitGroup is thread-safe, so it doesn't need to be run inside wb.Lock. defer wb.throttle.Done(err) if err == nil { return } - - if wb.err != nil { + if err := wb.Error(); err != nil { return } - wb.err = err + wb.err.Store(err) } func (wb *WriteBatch) Write(kvList *pb.KVList) error { @@ -135,7 +134,7 @@ func (wb *WriteBatch) handleEntry(e *Entry) error { // This time the error must not be ErrTxnTooBig, otherwise, we make the // error permanent. if err := wb.txn.SetEntry(e); err != nil { - wb.err = err + wb.err.Store(err) return err } return nil @@ -172,7 +171,7 @@ func (wb *WriteBatch) Delete(k []byte) error { return err } if err := wb.txn.Delete(k); err != nil { - wb.err = err + wb.err.Store(err) return err } return nil @@ -180,20 +179,20 @@ func (wb *WriteBatch) Delete(k []byte) error { // Caller to commit must hold a write lock. func (wb *WriteBatch) commit() error { - if wb.err != nil { - return wb.err + if err := wb.Error(); err != nil { + return err } if wb.finished { return y.ErrCommitAfterFinish } if err := wb.throttle.Do(); err != nil { - wb.err = err - return wb.err + wb.err.Store(err) + return err } wb.txn.CommitWith(wb.callback) wb.txn = wb.db.newTransaction(true, wb.isManaged) wb.txn.commitTs = wb.commitTs - return wb.err + return wb.Error() } // Flush must be called at the end to ensure that any pending writes get committed to Badger. Flush @@ -210,18 +209,18 @@ func (wb *WriteBatch) Flush() error { wb.Unlock() if err := wb.throttle.Finish(); err != nil { - if wb.err != nil { - return errors.Errorf("wb.err: %s err: %s", wb.err, err) + if wb.Error() != nil { + return errors.Errorf("wb.err: %s err: %s", wb.Error(), err) } return err } - return wb.err + return wb.Error() } // Error returns any errors encountered so far. No commits would be run once an error is detected. func (wb *WriteBatch) Error() error { - wb.Lock() - defer wb.Unlock() - return wb.err + // If the interface conversion fails, the err will be nil. + err, _ := wb.err.Load().(error) + return err } diff --git a/compaction.go b/compaction.go index 6d0cbf6a7..56ce6457a 100644 --- a/compaction.go +++ b/compaction.go @@ -33,6 +33,10 @@ type keyRange struct { inf bool } +func (r keyRange) isEmpty() bool { + return len(r.left) == 0 && len(r.right) == 0 && !r.inf +} + var infRange = keyRange{inf: true} func (r keyRange) String() string { @@ -45,7 +49,26 @@ func (r keyRange) equals(dst keyRange) bool { r.inf == dst.inf } +func (r *keyRange) extend(kr keyRange) { + if r.isEmpty() { + *r = kr + } + if len(r.left) == 0 || y.CompareKeys(kr.left, r.left) < 0 { + r.left = kr.left + } + if len(r.right) == 0 || y.CompareKeys(kr.right, r.right) > 0 { + r.right = kr.right + } + if kr.inf { + r.inf = true + } +} + func (r keyRange) overlapsWith(dst keyRange) bool { + // Empty keyRange always overlaps. + if r.isEmpty() { + return true + } if r.inf || dst.inf { return true } @@ -127,6 +150,7 @@ func (lcs *levelCompactStatus) remove(dst keyRange) bool { type compactStatus struct { sync.RWMutex levels []*levelCompactStatus + tables map[uint64]struct{} } func (cs *compactStatus) overlapsWith(level int, this keyRange) bool { @@ -151,11 +175,10 @@ func (cs *compactStatus) compareAndAdd(_ thisAndNextLevelRLocked, cd compactDef) cs.Lock() defer cs.Unlock() - level := cd.thisLevel.level - - y.AssertTruef(level < len(cs.levels)-1, "Got level %d. Max levels: %d", level, len(cs.levels)) - thisLevel := cs.levels[level] - nextLevel := cs.levels[level+1] + tl := cd.thisLevel.level + y.AssertTruef(tl < len(cs.levels)-1, "Got level %d. Max levels: %d", tl, len(cs.levels)) + thisLevel := cs.levels[cd.thisLevel.level] + nextLevel := cs.levels[cd.nextLevel.level] if thisLevel.overlapsWith(cd.thisRange) { return false @@ -171,6 +194,9 @@ func (cs *compactStatus) compareAndAdd(_ thisAndNextLevelRLocked, cd compactDef) thisLevel.ranges = append(thisLevel.ranges, cd.thisRange) nextLevel.ranges = append(nextLevel.ranges, cd.nextRange) thisLevel.delSize += cd.thisSize + for _, t := range append(cd.top, cd.bot...) { + cs.tables[t.ID()] = struct{}{} + } return true } @@ -178,24 +204,31 @@ func (cs *compactStatus) delete(cd compactDef) { cs.Lock() defer cs.Unlock() - level := cd.thisLevel.level - y.AssertTruef(level < len(cs.levels)-1, "Got level %d. Max levels: %d", level, len(cs.levels)) + tl := cd.thisLevel.level + y.AssertTruef(tl < len(cs.levels)-1, "Got level %d. Max levels: %d", tl, len(cs.levels)) - thisLevel := cs.levels[level] - nextLevel := cs.levels[level+1] + thisLevel := cs.levels[cd.thisLevel.level] + nextLevel := cs.levels[cd.nextLevel.level] thisLevel.delSize -= cd.thisSize found := thisLevel.remove(cd.thisRange) - found = nextLevel.remove(cd.nextRange) && found + if !cd.nextRange.isEmpty() { + found = nextLevel.remove(cd.nextRange) && found + } if !found { this := cd.thisRange next := cd.nextRange - fmt.Printf("Looking for: [%q, %q, %v] in this level.\n", this.left, this.right, this.inf) + fmt.Printf("Looking for: %s in this level %d.\n", this, tl) fmt.Printf("This Level:\n%s\n", thisLevel.debug()) fmt.Println() - fmt.Printf("Looking for: [%q, %q, %v] in next level.\n", next.left, next.right, next.inf) + fmt.Printf("Looking for: %s in next level %d.\n", next, cd.nextLevel.level) fmt.Printf("Next Level:\n%s\n", nextLevel.debug()) log.Fatal("keyRange not found") } + for _, t := range append(cd.top, cd.bot...) { + _, ok := cs.tables[t.ID()] + y.AssertTrue(ok) + delete(cs.tables, t.ID()) + } } diff --git a/db.go b/db.go index c3e74c6ef..3d705db7a 100644 --- a/db.go +++ b/db.go @@ -26,6 +26,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "sync/atomic" "time" @@ -85,11 +86,6 @@ type DB struct { flushChan chan flushTask // For flushing memtables. closeOnce sync.Once // For closing DB only once. - // Number of log rotates since the last memtable flush. We will access this field via atomic - // functions. Since we are not going to use any 64bit atomic functions, there is no need for - // 64 bit alignment of this struct(see #311). - logRotates int32 - blockWrites int32 isClosed uint32 @@ -112,10 +108,11 @@ func checkAndSetOptions(opt *Options) error { if opt.NumCompactors == 1 { return errors.New("Cannot have 1 compactor. Need at least 2") } + if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") { return errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set") } - opt.maxBatchSize = (15 * opt.MaxTableSize) / 100 + opt.maxBatchSize = (15 * opt.MemTableSize) / 100 opt.maxBatchCount = opt.maxBatchSize / int64(skl.MaxNodeSize) // We are limiting opt.ValueThreshold to maxValueThreshold for now. @@ -127,8 +124,9 @@ func checkAndSetOptions(opt *Options) error { // If ValueThreshold is greater than opt.maxBatchSize, we won't be able to push any data using // the transaction APIs. Transaction batches entries into batches of size opt.maxBatchSize. if int64(opt.ValueThreshold) > opt.maxBatchSize { - return errors.Errorf("Valuethreshold greater than max batch size of %d. Either "+ - "reduce opt.ValueThreshold or increase opt.MaxTableSize.", opt.maxBatchSize) + return errors.Errorf("Valuethreshold %d greater than max batch size of %d. Either "+ + "reduce opt.ValueThreshold or increase opt.MaxTableSize.", + opt.ValueThreshold, opt.maxBatchSize) } // ValueLogFileSize should be stricly LESS than 2<<30 otherwise we will // overflow the uint32 when we mmap it in OpenMemtable. @@ -141,10 +139,6 @@ func checkAndSetOptions(opt *Options) error { return y.ErrZstdCgo } - // Compact L0 on close if either it is set or if KeepL0InMemory is set. When - // keepL0InMemory is set we need to compact L0 on close otherwise we might lose data. - opt.CompactL0OnClose = opt.CompactL0OnClose - if opt.ReadOnly { // Do not perform compaction in read only mode. opt.CompactL0OnClose = false @@ -257,7 +251,7 @@ func Open(opt Options) (*DB, error) { if opt.IndexCacheSize > 0 { // Index size is around 5% of the table size. - indexSz := int64(float64(opt.MaxTableSize) * 0.05) + indexSz := int64(float64(opt.MemTableSize) * 0.05) numInCache := opt.IndexCacheSize / indexSz if numInCache == 0 { // Make the value of this variable at least one since the cache requires @@ -479,6 +473,7 @@ func (db *DB) IsClosed() bool { func (db *DB) close() (err error) { db.opt.Debugf("Closing database") + db.opt.Infof("Lifetime L0 stalled for: %s\n", time.Duration(db.lc.l0stallsMs)) atomic.StoreInt32(&db.blockWrites, 1) @@ -550,6 +545,7 @@ func (db *DB) close() (err error) { } } + db.opt.Infof(db.LevelsToString()) if lcErr := db.lc.close(); err == nil { err = y.Wrap(lcErr, "DB.Close") } @@ -912,18 +908,7 @@ func (db *DB) ensureRoomForWrite() error { db.Lock() defer db.Unlock() - // Here we determine if we need to force flush memtable. Given we rotated log file, it would - // make sense to force flush a memtable, so the updated value head would have a chance to be - // pushed to L0. Otherwise, it would not go to L0, until the memtable has been fully filled, - // which can take a lot longer if the write load has fewer keys and larger values. This force - // flush, thus avoids the need to read through a lot of log files on a crash and restart. - // Above approach is quite simple with small drawback. We are calling ensureRoomForWrite before - // inserting every entry in Memtable. We will get latest db.head after all entries for a request - // are inserted in Memtable. If we have done >= db.logRotates rotations, then while inserting - // first entry in Memtable, below condition will be true and we will endup flushing old value of - // db.head. Hence we are limiting no of value log files to be read to db.logRotates only. - forceFlush := atomic.LoadInt32(&db.logRotates) >= db.opt.LogRotatesToFlush - + var forceFlush bool // We don't need to force flush the memtable in in-memory mode because the size of the WAL will // always be zero. if !forceFlush && !db.opt.InMemory { @@ -931,16 +916,13 @@ func (db *DB) ensureRoomForWrite() error { forceFlush = int64(db.mt.wal.writeAt) > db.opt.ValueLogFileSize } - if !forceFlush && db.mt.sl.MemSize() < db.opt.MaxTableSize { + if !forceFlush && db.mt.sl.MemSize() < db.opt.MemTableSize { return nil } y.AssertTrue(db.mt != nil) // A nil mt indicates that DB is being closed. select { case db.flushChan <- flushTask{mt: db.mt}: - // After every memtable flush, let's reset the counter. - atomic.StoreInt32(&db.logRotates, 0) - db.opt.Debugf("Flushing memtable, mt.size=%d size of flushChan: %d\n", db.mt.sl.MemSize(), len(db.flushChan)) // We manage to push this task. Let's modify imm. @@ -958,7 +940,7 @@ func (db *DB) ensureRoomForWrite() error { } func arenaSize(opt Options) int64 { - return opt.MaxTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize) + return opt.MemTableSize + opt.maxBatchSize + opt.maxBatchCount*int64(skl.MaxNodeSize) } // buildL0Table builds a new table from the memtable. @@ -989,8 +971,7 @@ type flushTask struct { // handleFlushTask must be run serially. func (db *DB) handleFlushTask(ft flushTask) error { - // There can be a scenario, when empty memtable is flushed. For example, memtable is empty and - // after writing request to value log, rotation count exceeds db.LogRotatesToFlush. + // There can be a scenario, when empty memtable is flushed. if ft.mt.sl.Empty() { return nil } @@ -1014,7 +995,12 @@ func (db *DB) handleFlushTask(ft flushTask) error { } fileID := db.lc.reserveFileID() - tbl, err := table.CreateTable(table.NewFilename(fileID, db.opt.Dir), tableData, bopts) + var tbl *table.Table + if db.opt.InMemory { + tbl, err = table.OpenInMemoryTable(tableData, fileID, &bopts) + } else { + tbl, err = table.CreateTable(table.NewFilename(fileID, db.opt.Dir), tableData, bopts) + } if err != nil { return y.Wrap(err, "error while creating table") } @@ -1306,6 +1292,11 @@ func (db *DB) Tables() []TableInfo { return db.lc.getTableInfo() } +// Levels gets the LevelInfo. +func (db *DB) Levels() []LevelInfo { + return db.lc.getLevelInfo() +} + // KeySplits can be used to get rough key ranges to divide up iteration over // the DB. func (db *DB) KeySplits(prefix []byte) []string { @@ -1438,6 +1429,7 @@ func (db *DB) startMemoryFlush() { // stopped. Ideally, no writes are going on during Flatten. Otherwise, it would create competition // between flattening the tree and new tables being created at level zero. func (db *DB) Flatten(workers int) error { + db.stopCompactions() defer db.startCompactions() @@ -1473,13 +1465,14 @@ func (db *DB) Flatten(workers int) error { return humanize.Bytes(uint64(sz)) } + t := db.lc.levelTargets() for { db.opt.Infof("\n") var levels []int for i, l := range db.lc.levels { sz := l.getTotalSize() db.opt.Infof("Level: %d. %8s Size. %8s Max.\n", - i, hbytes(l.getTotalSize()), hbytes(l.maxTotalSize)) + i, hbytes(l.getTotalSize()), hbytes(t.targetSz[i])) if sz > 0 { levels = append(levels, i) } @@ -1630,7 +1623,7 @@ func (db *DB) dropAll() (func(), error) { // - Compact rest of the levels, Li->Li, picking tables which have Kp. // - Resume memtable flushes, compactions and writes. func (db *DB) DropPrefix(prefixes ...[]byte) error { - db.opt.Infof("DropPrefix Called") + db.opt.Infof("DropPrefix Called %s", prefixes) f, err := db.prepareToDrop() if err != nil { return err @@ -1835,3 +1828,28 @@ func (db *DB) CacheMaxCost(cache CacheType, maxCost int64) (int64, error) { return 0, errors.Errorf("invalid cache type") } } + +func (db *DB) LevelsToString() string { + levels := db.Levels() + h := func(sz int64) string { + return humanize.IBytes(uint64(sz)) + } + base := func(b bool) string { + if b { + return "B" + } + return " " + } + + var b strings.Builder + b.WriteRune('\n') + for _, li := range levels { + b.WriteString(fmt.Sprintf( + "Level %d [%s]: NumTables: %02d. Size: %s of %s. Score: %.2f->%.2f"+ + " Target FileSize: %s\n", + li.Level, base(li.IsBaseLevel), li.NumTables, + h(li.Size), h(li.TargetSize), li.Score, li.Adjusted, h(li.TargetFileSize))) + } + b.WriteString("Level Done\n") + return b.String() +} diff --git a/db2_test.go b/db2_test.go index daa5607d9..8b591efdd 100644 --- a/db2_test.go +++ b/db2_test.go @@ -103,7 +103,7 @@ func TestTruncateVlogWithClose(t *testing.T) { for i := 0; i < 32; i++ { err := db.View(func(txn *Txn) error { item, err := txn.Get(key(i)) - require.NoError(t, err) + require.NoError(t, err, "key: %s", key(i)) val := getItemValue(t, item) require.Equal(t, 10, len(val)) return nil @@ -211,7 +211,7 @@ func TestBigKeyValuePairs(t *testing.T) { // Passing an empty directory since it will be filled by runBadgerTest. opts := DefaultOptions(""). - WithMaxTableSize(1 << 20). + WithBaseTableSize(1 << 20). WithValueLogMaxEntries(64) runBadgerTest(t, &opts, func(t *testing.T, db *DB) { bigK := make([]byte, 65001) diff --git a/db_test.go b/db_test.go index 8ef6d359c..baaa07658 100644 --- a/db_test.go +++ b/db_test.go @@ -67,8 +67,9 @@ func (s *DB) validate() error { return s.lc.validate() } func getTestOptions(dir string) Options { opt := DefaultOptions(dir). - WithMaxTableSize(1 << 15). // Force more compaction. - WithLevelOneSize(4 << 15). // Force more compaction. + WithMemTableSize(1 << 15). + WithBaseTableSize(1 << 15). // Force more compaction. + WithBaseLevelSize(4 << 15). // Force more compaction. WithSyncWrites(false) return opt } @@ -457,8 +458,8 @@ func BenchmarkDbGrowth(b *testing.B) { maxWrites := 200 opts := getTestOptions(dir) opts.ValueLogFileSize = 64 << 15 - opts.MaxTableSize = 4 << 15 - opts.LevelOneSize = 16 << 15 + opts.BaseTableSize = 4 << 15 + opts.BaseLevelSize = 16 << 15 opts.NumVersionsToKeep = 1 opts.NumLevelZeroTables = 1 opts.NumLevelZeroTablesStall = 2 @@ -1751,10 +1752,11 @@ func TestLSMOnly(t *testing.T) { // Also test for error, when ValueThresholdSize is greater than maxBatchSize. dopts.ValueThreshold = LSMOnlyOptions(dir).ValueThreshold // maxBatchSize is calculated from MaxTableSize. - dopts.MaxTableSize = int64(LSMOnlyOptions(dir).ValueThreshold) + dopts.MemTableSize = int64(LSMOnlyOptions(dir).ValueThreshold) _, err = Open(dopts) require.Error(t, err, "db creation should have been failed") - require.Contains(t, err.Error(), "Valuethreshold greater than max batch size") + require.Contains(t, err.Error(), + fmt.Sprintf("Valuethreshold %d greater than max batch size", dopts.ValueThreshold)) opts.ValueLogMaxEntries = 100 db, err := Open(opts) @@ -2024,7 +2026,6 @@ func TestForceFlushMemtable(t *testing.T) { ops := getTestOptions(dir) ops.ValueLogMaxEntries = 1 - ops.LogRotatesToFlush = 1 db, err := Open(ops) require.NoError(t, err, "error while openning db") diff --git a/go.mod b/go.mod index 001560237..bb7e17852 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ go 1.12 require ( github.com/DataDog/zstd v1.4.1 github.com/cespare/xxhash v1.1.0 - github.com/dgraph-io/ristretto v0.0.4-0.20201022105248-f32a01612740 + github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f github.com/dustin/go-humanize v1.0.0 github.com/golang/protobuf v1.3.1 github.com/golang/snappy v0.0.1 diff --git a/go.sum b/go.sum index b4d2b6275..6a082e276 100644 --- a/go.sum +++ b/go.sum @@ -15,10 +15,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto v0.0.4-0.20201020115802-f071429c1049 h1:lT8vahI6E7R84KeSsXvj3QST/OusCP8g5rEctzsjUIA= -github.com/dgraph-io/ristretto v0.0.4-0.20201020115802-f071429c1049/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= -github.com/dgraph-io/ristretto v0.0.4-0.20201022105248-f32a01612740 h1:TzbxnxH3PoFUWx5024RX1+uqLnUVbfdHANjrHMb5Xnc= -github.com/dgraph-io/ristretto v0.0.4-0.20201022105248-f32a01612740/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= +github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f h1:YPDUnM9Rkd0V41Ie43v/QoNgz5NNGcZv05UnYEnQgo4= +github.com/dgraph-io/ristretto v0.0.4-0.20201023213945-72c2139ec27f/go.mod h1:bDI4cDaalvYSji3vBVDKrn9ouDZrwN974u8ZO/AhYXs= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= diff --git a/level_handler.go b/level_handler.go index 4fa6cae53..3836be28d 100644 --- a/level_handler.go +++ b/level_handler.go @@ -36,10 +36,9 @@ type levelHandler struct { totalSize int64 // The following are initialized once and const. - level int - strLevel string - maxTotalSize int64 - db *DB + level int + strLevel string + db *DB } func (s *levelHandler) getTotalSize() int64 { diff --git a/levels.go b/levels.go index bf6183906..27c00af72 100644 --- a/levels.go +++ b/levels.go @@ -21,6 +21,7 @@ import ( "context" "encoding/hex" "fmt" + "math" "math/rand" "os" "sort" @@ -45,7 +46,8 @@ type levelsController struct { levels []*levelHandler kv *DB - cstatus compactStatus + cstatus compactStatus + l0stallsMs int64 } // revertToManifest checks that all necessary table files exist and removes all table files not @@ -79,19 +81,11 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) { kv: db, levels: make([]*levelHandler, db.opt.MaxLevels), } + s.cstatus.tables = make(map[uint64]struct{}) s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels) for i := 0; i < db.opt.MaxLevels; i++ { s.levels[i] = newLevelHandler(db, i) - switch i { - case 0: - // Do nothing. - case 1: - // Level 1 probably shouldn't be too much bigger than level 0. - s.levels[i].maxTotalSize = db.opt.LevelOneSize - default: - s.levels[i].maxTotalSize = s.levels[i-1].maxTotalSize * int64(db.opt.LevelSizeMultiplier) - } s.cstatus.levels[i] = new(levelCompactStatus) } @@ -343,8 +337,10 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error { top: nil, bot: operation, dropPrefixes: prefixes, + t: s.levelTargets(), } - if err := s.runCompactDef(l.level, cd); err != nil { + cd.t.baseLevel = l.level + if err := s.runCompactDef(-1, l.level, cd); err != nil { opt.Warningf("While running compact def: %+v. Error: %v", cd, err) return err } @@ -357,12 +353,70 @@ func (s *levelsController) startCompact(lc *z.Closer) { n := s.kv.opt.NumCompactors lc.AddRunning(n - 1) for i := 0; i < n; i++ { - // The worker with id=0 is dedicated to L0 and L1. This is not counted - // towards the user specified NumCompactors. go s.runCompactor(i, lc) } } +type targets struct { + baseLevel int + targetSz []int64 + fileSz []int64 +} + +// levelTargets calculates the targets for levels in the LSM tree. The idea comes from Dynamic Level +// Sizes ( https://rocksdb.org/blog/2015/07/23/dynamic-level.html ) in RocksDB. The sizes of levels +// are calculated based on the size of the lowest level, typically L6. So, if L6 size is 1GB, then +// L5 target size is 100MB, L4 target size is 10MB and so on. +// +// L0 files don't automatically go to L1. Instead, they get compacted to Lbase, where Lbase is +// chosen based on the first level which is non-empty from top (check L1 through L6). For an empty +// DB, that would be L6. So, L0 compactions go to L6, then L5, L4 and so on. +// +// Lbase is advanced to the upper levels when its target size exceeds BaseLevelSize. For +// example, when L6 reaches 1.1GB, then L4 target sizes becomes 11MB, thus exceeding the +// BaseLevelSize of 10MB. L3 would then become the new Lbase, with a target size of 1MB < +// BaseLevelSize. +func (s *levelsController) levelTargets() targets { + adjust := func(sz int64) int64 { + if sz < s.kv.opt.BaseLevelSize { + return s.kv.opt.BaseLevelSize + } + return sz + } + + t := targets{ + targetSz: make([]int64, len(s.levels)), + fileSz: make([]int64, len(s.levels)), + } + // DB size is the size of the last level. + dbSize := s.lastLevel().getTotalSize() + for i := len(s.levels) - 1; i > 0; i-- { + ltarget := adjust(dbSize) + t.targetSz[i] = ltarget + if t.baseLevel == 0 && ltarget <= s.kv.opt.BaseLevelSize { + t.baseLevel = i + } + dbSize /= int64(s.kv.opt.LevelSizeMultiplier) + } + + tsz := s.kv.opt.BaseTableSize + for i := 0; i < len(s.levels); i++ { + if i == 0 { + // Use MemTableSize for Level 0. Because at Level 0, we stop compactions based on the + // number of tables, not the size of the level. So, having a 1:1 size ratio between + // memtable size and the size of L0 files is better than churning out 32 files per + // memtable (assuming 64MB MemTableSize and 2MB BaseTableSize). + t.fileSz[i] = s.kv.opt.MemTableSize + } else if i <= t.baseLevel { + t.fileSz[i] = tsz + } else { + tsz *= int64(s.kv.opt.TableSizeMultiplier) + t.fileSz[i] = tsz + } + } + return t +} + func (s *levelsController) runCompactor(id int, lc *z.Closer) { defer lc.Done() @@ -374,90 +428,143 @@ func (s *levelsController) runCompactor(id int, lc *z.Closer) { return } - ticker := time.NewTicker(100 * time.Millisecond) + ticker := time.NewTicker(50 * time.Millisecond) defer ticker.Stop() + moveL0toFront := func(prios []compactionPriority) []compactionPriority { + idx := -1 + for i, p := range prios { + if p.level == 0 { + idx = i + break + } + } + // If idx == -1, we didn't find L0. + // If idx == 0, then we don't need to do anything. L0 is already at the front. + if idx > 0 { + out := append([]compactionPriority{}, prios[idx]) + out = append(out, prios[:idx]...) + out = append(out, prios[idx+1:]...) + return out + } + return prios + } + + runOnce := func() bool { + prios := s.pickCompactLevels() + if id == 0 { + // Worker ID zero prefers to compact L0 always. + prios = moveL0toFront(prios) + } + for _, p := range prios { + if id == 0 && p.level == 0 { + // Allow worker zero to run level 0, irrespective of its adjusted score. + } else if p.adjusted < 1.0 { + break + } + + err := s.doCompact(id, p) + switch err { + case nil: + return true + case errFillTables: + // pass + default: + s.kv.opt.Warningf("While running doCompact: %v\n", err) + } + } + return false + } + for { select { // Can add a done channel or other stuff. case <-ticker.C: - prios := s.pickCompactLevels() - loop: - for _, p := range prios { - if id == 0 && p.level > 1 { - // If I'm ID zero, I only compact L0 and L1. - continue - } - if id != 0 && p.level <= 1 { - // If I'm ID non-zero, I do NOT compact L0 and L1. - continue - } - err := s.doCompact(id, p) - switch err { - case nil: - break loop - case errFillTables: - // pass - default: - s.kv.opt.Warningf("While running doCompact: %v\n", err) - } - } + runOnce() case <-lc.HasBeenClosed(): return } } } -// Returns true if level zero may be compacted, without accounting for compactions that already -// might be happening. -func (s *levelsController) isLevel0Compactable() bool { - return s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTables -} - -// Returns true if the non-zero level may be compacted. delSize provides the size of the tables -// which are currently being compacted so that we treat them as already having started being -// compacted (because they have been, yet their size is already counted in getTotalSize). -func (l *levelHandler) isCompactable(delSize int64) bool { - return l.getTotalSize()-delSize >= l.maxTotalSize -} - type compactionPriority struct { level int score float64 + adjusted float64 dropPrefixes [][]byte + t targets +} + +func (s *levelsController) lastLevel() *levelHandler { + return s.levels[len(s.levels)-1] } // pickCompactLevel determines which level to compact. // Based on: https://github.com/facebook/rocksdb/wiki/Leveled-Compaction func (s *levelsController) pickCompactLevels() (prios []compactionPriority) { - // This function must use identical criteria for guaranteeing compaction's progress that - // addLevel0Table uses. - - // cstatus is checked to see if level 0's tables are already being compacted - if !s.cstatus.overlapsWith(0, infRange) && s.isLevel0Compactable() { + t := s.levelTargets() + addPriority := func(level int, score float64) { pri := compactionPriority{ - level: 0, - score: float64(s.levels[0].numTables()) / float64(s.kv.opt.NumLevelZeroTables), + level: level, + score: score, + adjusted: score, + t: t, } prios = append(prios, pri) } - for i, l := range s.levels[1:] { + // Add L0 priority based on the number of tables. + addPriority(0, float64(s.levels[0].numTables())/float64(s.kv.opt.NumLevelZeroTables)) + + // All other levels use size to calculate priority. + for i := 1; i < len(s.levels); i++ { // Don't consider those tables that are already being compacted right now. - delSize := s.cstatus.delSize(i + 1) + delSize := s.cstatus.delSize(i) - if l.isCompactable(delSize) { - pri := compactionPriority{ - level: i + 1, - score: float64(l.getTotalSize()-delSize) / float64(l.maxTotalSize), + l := s.levels[i] + sz := l.getTotalSize() - delSize + addPriority(i, float64(sz)/float64(t.targetSz[i])) + } + y.AssertTrue(len(prios) == len(s.levels)) + + // The following code is borrowed from PebbleDB and results in healthier LSM tree structure. + // If Li-1 has score > 1.0, then we'll divide Li-1 score by Li. If Li score is >= 1.0, then Li-1 + // score is reduced, which means we'll prioritize the compaction of lower levels (L5, L4 and so + // on) over the higher levels (L0, L1 and so on). On the other hand, if Li score is < 1.0, then + // we'll increase the priority of Li-1. + // Overall what this means is, if the bottom level is already overflowing, then de-prioritize + // compaction of the above level. If the bottom level is not full, then increase the priority of + // above level. + var prevLevel int + for level := t.baseLevel; level < len(s.levels); level++ { + if prios[prevLevel].adjusted >= 1 { + // Avoid absurdly large scores by placing a floor on the score that we'll + // adjust a level by. The value of 0.01 was chosen somewhat arbitrarily + const minScore = 0.01 + if prios[level].score >= minScore { + prios[prevLevel].adjusted /= prios[level].adjusted + } else { + prios[prevLevel].adjusted /= minScore } - prios = append(prios, pri) + } + prevLevel = level + } + + // Pick all the levels whose original score is >= 1.0, irrespective of their adjusted score. + // We'll still sort them by their adjusted score below. Having both these scores allows us to + // make better decisions about compacting L0. If we see a score >= 1.0, we can do L0->L0 + // compactions. If the adjusted score >= 1.0, then we can do L0->Lbase compactions. + out := prios[:0] + for _, p := range prios[:len(prios)-1] { + if p.score >= 1.0 { + out = append(out, p) } } - // We should continue to sort the compaction priorities by score. Now that we have a dedicated - // compactor for L0 and L1, we don't need to sort by level here. + prios = out + + // Sort by the adjusted score. sort.Slice(prios, func(i, j int) bool { - return prios[i].score > prios[j].score + return prios[i].adjusted > prios[j].adjusted }) return prios } @@ -479,23 +586,25 @@ func (s *levelsController) checkOverlap(tables []*table.Table, lev int) bool { return false } -// compactBuildTables merges topTables and botTables to form a list of new tables. -func (s *levelsController) compactBuildTables( - lev int, cd compactDef) ([]*table.Table, func() error, error) { - topTables := cd.top - botTables := cd.bot - - numTables := int64(len(topTables) + len(botTables)) - y.NumCompactionTables.Add(numTables) - defer y.NumCompactionTables.Add(-numTables) - - cd.span.Annotatef(nil, "Top tables count: %v Bottom tables count: %v", - len(topTables), len(botTables)) +// subcompact runs a single sub-compaction, iterating over the specified key-range only. +// +// We use splits to do a single compaction concurrently. If we have >= 3 tables +// involved in the bottom level during compaction, we choose key ranges to +// split the main compaction up into sub-compactions. Each sub-compaction runs +// concurrently, only iterating over the provided key range, generating tables. +// This speeds up the compaction significantly. +func (s *levelsController) subcompact(it y.Iterator, kr keyRange, cd compactDef, + inflightBuilders *y.Throttle, res chan<- *table.Table) { // Check overlap of the top level with the levels which are not being // compacted in this compaction. hasOverlap := s.checkOverlap(cd.allTables(), cd.nextLevel.level+1) + // Pick a discard ts, so we can discard versions below this ts. We should + // never discard any versions starting from above this timestamp, because + // that would affect the snapshot view guarantee provided by transactions. + discardTs := s.kv.orc.discardAtOrBelow() + // Try to collect stats so that we can inform value log about GC. That would help us find which // value log file should be GCed. discardStats := make(map[uint32]int64) @@ -511,66 +620,31 @@ func (s *levelsController) compactBuildTables( } } - // Create iterators across all the tables involved first. - var iters []y.Iterator - switch { - case lev == 0: - iters = appendIteratorsReversed(iters, topTables, table.NOCACHE) - case len(topTables) > 0: - y.AssertTrue(len(topTables) == 1) - iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)} - } - - // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. - var valid []*table.Table - -nextTable: - for _, table := range botTables { - if len(cd.dropPrefixes) > 0 { - for _, prefix := range cd.dropPrefixes { - if bytes.HasPrefix(table.Smallest(), prefix) && - bytes.HasPrefix(table.Biggest(), prefix) { - // All the keys in this table have the dropPrefix. So, this - // table does not need to be in the iterator and can be - // dropped immediately. - continue nextTable - } - } + // exceedsAllowedOverlap returns true if the given key range would overlap with more than 10 + // tables from level below nextLevel (nextLevel+1). This helps avoid generating tables at Li + // with huge overlaps with Li+1. + exceedsAllowedOverlap := func(kr keyRange) bool { + n2n := cd.nextLevel.level + 1 + if n2n <= 1 || n2n >= len(s.levels) { + return false } - valid = append(valid, table) - } - iters = append(iters, table.NewConcatIterator(valid, table.NOCACHE)) - it := table.NewMergeIterator(iters, false) - defer it.Close() // Important to close the iterator to do ref counting. + n2nl := s.levels[n2n] + n2nl.RLock() + defer n2nl.RUnlock() - it.Rewind() - - // Pick a discard ts, so we can discard versions below this ts. We should - // never discard any versions starting from above this timestamp, because - // that would affect the snapshot view guarantee provided by transactions. - discardTs := s.kv.orc.discardAtOrBelow() + l, r := n2nl.overlappingTables(levelHandlerRLocked{}, kr) + return r-l >= 10 + } - var numBuilds, numVersions int var lastKey, skipKey []byte + var numBuilds, numVersions int var vp valuePointer - var newTables []*table.Table - mu := new(sync.Mutex) // Guards newTables - inflightBuilders := y.NewThrottle(5) - for it.Valid() { + addKeys := func(builder *table.Builder) { timeStart := time.Now() - dk, err := s.kv.registry.LatestDataKey() - if err != nil { - return nil, nil, - y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables") - } - bopts := buildTableOptions(s.kv.opt) - bopts.DataKey = dk - // Builder does not need cache but the same options are used for opening table. - bopts.BlockCache = s.kv.blockCache - bopts.IndexCache = s.kv.indexCache - builder := table.NewTableBuilder(bopts) var numKeys, numSkips uint64 + var rangeCheck int + var tableKr keyRange for ; it.Valid(); it.Next() { // See if we need to skip the prefix. if len(cd.dropPrefixes) > 0 && hasAnyPrefixes(it.Key(), cd.dropPrefixes) { @@ -591,7 +665,10 @@ nextTable: } if !y.SameKey(it.Key(), lastKey) { - if builder.ReachedCapacity(uint64(float64(s.kv.opt.MaxTableSize) * 0.9)) { + if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 { + break + } + if builder.ReachedCapacity() { // Only break if we are on a different key, and have reached capacity. We want // to ensure that all versions of the key are stored in the same sstable, and // not divided across multiple tables at the same level. @@ -599,6 +676,24 @@ nextTable: } lastKey = y.SafeCopy(lastKey, it.Key()) numVersions = 0 + + if len(tableKr.left) == 0 { + tableKr.left = y.SafeCopy(tableKr.left, it.Key()) + } + tableKr.right = lastKey + + rangeCheck++ + if rangeCheck%5000 == 0 { + // This table's range exceeds the allowed range overlap with the level after + // next. So, we stop writing to this table. If we don't do this, then we end up + // doing very expensive compactions involving too many tables. To amortize the + // cost of this check, we do it only every N keys. + if exceedsAllowedOverlap(tableKr) { + // s.kv.opt.Debugf("L%d -> L%d Breaking due to exceedsAllowedOverlap with + // kr: %s\n", cd.thisLevel.level, cd.nextLevel.level, tableKr) + break + } + } } vs := it.Value() @@ -649,10 +744,40 @@ nextTable: } builder.Add(it.Key(), vs, vp.Len) } + s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v", + numKeys, numSkips, time.Since(timeStart).Round(time.Millisecond)) + } // End of function: addKeys + + if len(kr.left) > 0 { + it.Seek(kr.left) + } else { + it.Rewind() + } + for it.Valid() { + if len(kr.right) > 0 && y.CompareKeys(it.Key(), kr.right) >= 0 { + break + } + + dk, err := s.kv.registry.LatestDataKey() + if err != nil { + inflightBuilders.Done(y.Wrapf(err, "Error while retrieving datakey in levelsController.compactBuildTables")) + return + } + bopts := buildTableOptions(s.kv.opt) + bopts.DataKey = dk + // Builder does not need cache but the same options are used for opening table. + bopts.BlockCache = s.kv.blockCache + bopts.IndexCache = s.kv.indexCache + + // Set TableSize to the target file size for that level. + bopts.TableSize = uint64(cd.t.fileSz[cd.nextLevel.level]) + builder := table.NewTableBuilder(bopts) + + // This would do the iteration and add keys to builder. + addKeys(builder) + // It was true that it.Valid() at least once in the loop above, which means we // called Add() at least once, and builder is not Empty(). - s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v", - numKeys, numSkips, time.Since(timeStart)) if builder.Empty() { // Cleanup builder resources: builder.Finish(false) @@ -686,19 +811,88 @@ nextTable: if err != nil { return } + res <- tbl + }(builder) + } + s.kv.vlog.updateDiscardStats(discardStats) + s.kv.opt.Debugf("Discard stats: %v", discardStats) + inflightBuilders.Done(nil) +} - mu.Lock() - newTables = append(newTables, tbl) - // num := atomic.LoadInt32(&table.NumBlocks) - mu.Unlock() +// compactBuildTables merges topTables and botTables to form a list of new tables. +func (s *levelsController) compactBuildTables( + lev int, cd compactDef) ([]*table.Table, func() error, error) { - // TODO(ibrahim): When ristretto PR #186 merges, bring this back. - // s.kv.opt.Debugf("Num Blocks: %d. Num Allocs (MB): %.2f\n", num, (z.NumAllocBytes() / 1 << 20)) - }(builder) + topTables := cd.top + botTables := cd.bot + + numTables := int64(len(topTables) + len(botTables)) + y.NumCompactionTables.Add(numTables) + defer y.NumCompactionTables.Add(-numTables) + + cd.span.Annotatef(nil, "Top tables count: %v Bottom tables count: %v", + len(topTables), len(botTables)) + + keepTable := func(t *table.Table) bool { + for _, prefix := range cd.dropPrefixes { + if bytes.HasPrefix(t.Smallest(), prefix) && + bytes.HasPrefix(t.Biggest(), prefix) { + // All the keys in this table have the dropPrefix. So, this + // table does not need to be in the iterator and can be + // dropped immediately. + return false + } + } + return true + } + var valid []*table.Table + for _, table := range botTables { + if keepTable(table) { + valid = append(valid, table) + } + } + + newIterator := func() []y.Iterator { + // Create iterators across all the tables involved first. + var iters []y.Iterator + switch { + case lev == 0: + iters = appendIteratorsReversed(iters, topTables, table.NOCACHE) + case len(topTables) > 0: + y.AssertTrue(len(topTables) == 1) + iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)} + } + // Next level has level>=1 and we can use ConcatIterator as key ranges do not overlap. + return append(iters, table.NewConcatIterator(valid, table.NOCACHE)) + } + + res := make(chan *table.Table, 3) + inflightBuilders := y.NewThrottle(8 + len(cd.splits)) + for _, kr := range cd.splits { + // Initiate Do here so we can register the goroutines for buildTables too. + inflightBuilders.Do() + go func(kr keyRange) { + it := table.NewMergeIterator(newIterator(), false) + defer it.Close() + s.subcompact(it, kr, cd, inflightBuilders, res) + }(kr) } + var newTables []*table.Table + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for t := range res { + newTables = append(newTables, t) + } + }() + // Wait for all table builders to finish and also for newTables accumulator to finish. err := inflightBuilders.Finish() + close(res) + wg.Wait() // Wait for all tables to be picked up. + if err == nil { // Ensure created files' directory entries are visible. We don't mind the extra latency // from not doing this ASAP after all file creation has finished because this is a @@ -716,8 +910,6 @@ nextTable: sort.Slice(newTables, func(i, j int) bool { return y.CompareKeys(newTables[i].Biggest(), newTables[j].Biggest()) < 0 }) - s.kv.vlog.updateDiscardStats(discardStats) - s.kv.opt.Debugf("Discard stats: %v", discardStats) return newTables, func() error { return decrRefs(newTables) }, nil } @@ -777,20 +969,52 @@ func containsAnyPrefixes(smallValue, largeValue []byte, listOfPrefixes [][]byte) type compactDef struct { span *otrace.Span - thisLevel *levelHandler - nextLevel *levelHandler + compactorId int + t targets + p compactionPriority + thisLevel *levelHandler + nextLevel *levelHandler top []*table.Table bot []*table.Table thisRange keyRange nextRange keyRange + splits []keyRange thisSize int64 dropPrefixes [][]byte } +// addSplits can allow us to run multiple sub-compactions in parallel across the split key ranges. +func (s *levelsController) addSplits(cd *compactDef) { + cd.splits = cd.splits[:0] + + // Pick one every 3 tables. + const N = 3 + skr := cd.thisRange + skr.extend(cd.nextRange) + + addRange := func(right []byte) { + skr.right = y.Copy(right) + cd.splits = append(cd.splits, skr) + + skr.left = skr.right + } + + for i, t := range cd.bot { + // last entry in bottom table. + if i == len(cd.bot)-1 { + addRange([]byte{}) + return + } + if i%N == N-1 { + addRange(t.Biggest()) + } + } +} + func (cd *compactDef) lockLevels() { cd.thisLevel.RLock() cd.nextLevel.RLock() @@ -808,33 +1032,127 @@ func (cd *compactDef) allTables() []*table.Table { return ret } -func (s *levelsController) fillTablesL0(cd *compactDef) bool { +func (s *levelsController) fillTablesL0ToL0(cd *compactDef) bool { + if cd.compactorId != 0 { + // Only compactor zero can work on this. + return false + } + + cd.nextLevel = s.levels[0] + cd.nextRange = keyRange{} + cd.bot = nil + cd.lockLevels() defer cd.unlockLevels() - cd.top = make([]*table.Table, len(cd.thisLevel.tables)) - copy(cd.top, cd.thisLevel.tables) - if len(cd.top) == 0 { + s.cstatus.Lock() + defer s.cstatus.Unlock() + + top := cd.thisLevel.tables + var out []*table.Table + now := time.Now() + for _, t := range top { + if t.Size() >= 2*cd.t.fileSz[0] { + // This file is already big, don't include it. + continue + } + if now.Sub(t.CreatedAt) < 10*time.Second { + // Just created it 10s ago. Don't pick for compaction. + continue + } + if _, beingCompacted := s.cstatus.tables[t.ID()]; beingCompacted { + continue + } + out = append(out, t) + } + + if len(out) < 4 { + // If we don't have enough tables to merge in L0, don't do it. return false } cd.thisRange = infRange + cd.top = out + + // Avoid any other L0 -> Lbase from happening, while this is going on. + thisLevel := s.cstatus.levels[cd.thisLevel.level] + thisLevel.ranges = append(thisLevel.ranges, infRange) + for _, t := range out { + s.cstatus.tables[t.ID()] = struct{}{} + } + + // For L0->L0 compaction, we set the target file size to max, so the output is always one file. + // This significantly decreases the L0 table stalls and improves the performance. + cd.t.fileSz[0] = math.MaxUint32 + return true +} + +func (s *levelsController) fillTablesL0ToLbase(cd *compactDef) bool { + if cd.nextLevel.level == 0 { + panic("Base level can't be zero.") + } + // We keep cd.p.adjusted > 0.0 here to allow functions in db.go to artificially trigger + // L0->Lbase compactions. Those functions wouldn't be setting the adjusted score. + if cd.p.adjusted > 0.0 && cd.p.adjusted < 1.0 { + // Do not compact to Lbase if adjusted score is less than 1.0. + return false + } + cd.lockLevels() + defer cd.unlockLevels() + + top := cd.thisLevel.tables + if len(top) == 0 { + return false + } + + var out []*table.Table + if len(cd.dropPrefixes) > 0 { + // Use all tables if drop prefix is set. We don't want to compact only a + // sub-range. We want to compact all the tables. + out = top - kr := getKeyRange(cd.top...) - left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, kr) + } else { + var kr keyRange + // cd.top[0] is the oldest file. So we start from the oldest file first. + for _, t := range top { + dkr := getKeyRange(t) + if kr.overlapsWith(dkr) { + out = append(out, t) + kr.extend(dkr) + } else { + break + } + } + } + cd.thisRange = getKeyRange(out...) + cd.top = out + + left, right := cd.nextLevel.overlappingTables(levelHandlerRLocked{}, cd.thisRange) cd.bot = make([]*table.Table, right-left) copy(cd.bot, cd.nextLevel.tables[left:right]) if len(cd.bot) == 0 { - cd.nextRange = kr + cd.nextRange = cd.thisRange } else { cd.nextRange = getKeyRange(cd.bot...) } + return s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) +} - if !s.cstatus.compareAndAdd(thisAndNextLevelRLocked{}, *cd) { - return false +// fillTablesL0 would try to fill tables from L0 to be compacted with Lbase. If +// it can not do that, it would try to compact tables from L0 -> L0. +// +// Say L0 has 10 tables. +// fillTablesL0ToLbase picks up 5 tables to compact from L0 -> L5. +// Next call to fillTablesL0 would run L0ToLbase again, which fails this time. +// So, instead, we run fillTablesL0ToL0, which picks up rest of the 5 tables to +// be compacted within L0. Additionally, it would set the compaction range in +// cstatus to inf, so no other L0 -> Lbase compactions can happen. +// Thus, L0 -> L0 must finish for the next L0 -> Lbase to begin. +func (s *levelsController) fillTablesL0(cd *compactDef) bool { + if ok := s.fillTablesL0ToLbase(cd); ok { + return true } - - return true + return s.fillTablesL0ToL0(cd) } // sortByHeuristic sorts tables in increasing order of MaxVersion, so we @@ -909,12 +1227,25 @@ func (s *levelsController) fillTables(cd *compactDef) bool { return false } -func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { +func (s *levelsController) runCompactDef(id, l int, cd compactDef) (err error) { + if len(cd.t.fileSz) == 0 { + return errors.New("Filesizes cannot be zero. Targets are not set") + } timeStart := time.Now() thisLevel := cd.thisLevel nextLevel := cd.nextLevel + y.AssertTrue(len(cd.splits) == 0) + if thisLevel.level == 0 && nextLevel.level == 0 { + // don't do anything for L0 -> L0. + } else { + s.addSplits(&cd) + } + if len(cd.splits) == 0 { + cd.splits = append(cd.splits, keyRange{}) + } + // Table should never be moved directly between levels, always be rewritten to allow discarding // invalid versions. @@ -947,56 +1278,78 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) { // Note: For level 0, while doCompact is running, it is possible that new tables are added. // However, the tables are added only to the end, so it is ok to just delete the first table. - if dur := time.Since(timeStart); dur > 3*time.Second { - s.kv.opt.Infof("LOG Compact %d->%d, del %d tables, add %d tables, took %v\n", - thisLevel.level, nextLevel.level, len(cd.top)+len(cd.bot), - len(newTables), dur) + from := append(tablesToString(cd.top), tablesToString(cd.bot)...) + to := tablesToString(newTables) + + if dur := time.Since(timeStart); dur > 2*time.Second { + var expensive string + if dur > time.Second { + expensive = " [E]" + } + s.kv.opt.Infof("[%d]%s LOG Compact %d->%d (%d, %d -> %d tables with %d splits)."+ + " [%s] -> [%s], took %v\n", + id, expensive, thisLevel.level, nextLevel.level, len(cd.top), len(cd.bot), + len(newTables), len(cd.splits), strings.Join(from, " "), strings.Join(to, " "), + dur.Round(time.Millisecond)) } if cd.thisLevel.level != 0 && len(newTables) > 2*s.kv.opt.LevelSizeMultiplier { - s.kv.opt.Infof("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n", + s.kv.opt.Debugf("This Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n", len(cd.top), hex.Dump(cd.thisRange.left), hex.Dump(cd.thisRange.right)) - s.kv.opt.Infof("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n", + s.kv.opt.Debugf("Next Range (numTables: %d)\nLeft:\n%s\nRight:\n%s\n", len(cd.bot), hex.Dump(cd.nextRange.left), hex.Dump(cd.nextRange.right)) } return nil } +func tablesToString(tables []*table.Table) []string { + var res []string + for _, t := range tables { + res = append(res, fmt.Sprintf("%05d", t.ID())) + } + res = append(res, ".") + return res +} + var errFillTables = errors.New("Unable to fill tables") // doCompact picks some table on level l and compacts it away to the next level. func (s *levelsController) doCompact(id int, p compactionPriority) error { l := p.level y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check. + if p.t.baseLevel == 0 { + p.t = s.levelTargets() + } + _, span := otrace.StartSpan(context.Background(), "Badger.Compaction") - span.Annotatef(nil, "Compaction level: %v", l) defer span.End() + cd := compactDef{ + compactorId: id, span: span, + p: p, + t: p.t, thisLevel: s.levels[l], - nextLevel: s.levels[l+1], dropPrefixes: p.dropPrefixes, } - s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p) - // While picking tables to be compacted, both levels' tables are expected to // remain unchanged. if l == 0 { + cd.nextLevel = s.levels[p.t.baseLevel] if !s.fillTablesL0(&cd) { return errFillTables } - } else { + cd.nextLevel = s.levels[l+1] if !s.fillTables(&cd) { return errFillTables } } defer s.cstatus.delete(cd) // Remove the ranges from compaction status. - s.kv.opt.Debugf("[Compactor: %d] Running compaction: %+v for level: %d\n", - id, p, cd.thisLevel.level) - if err := s.runCompactDef(l, cd); err != nil { + span.Annotatef(nil, "Compaction: %+v", cd) + if err := s.runCompactDef(id, l, cd); err != nil { // This compaction couldn't be done successfully. s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd) return err @@ -1023,34 +1376,16 @@ func (s *levelsController) addLevel0Table(t *table.Table) error { } for !s.levels[0].tryAddLevel0Table(t) { - // Stall. Make sure all levels are healthy before we unstall. - s.cstatus.RLock() - for i := 0; i < s.kv.opt.MaxLevels; i++ { - s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n", - i, s.cstatus.levels[i].debug(), s.levels[i].getTotalSize()) - } - s.cstatus.RUnlock() + // Before we unstall, we need to make sure that level 0 is healthy. timeStart := time.Now() - - // Before we unstall, we need to make sure that level 0 is healthy. Otherwise, we - // will very quickly fill up level 0 again. - for i := 0; ; i++ { - // It's crucial that this behavior replicates pickCompactLevels' behavior in - // computing compactability in order to guarantee progress. - // Break the loop once L0 has enough space to accommodate new tables. - if !s.isLevel0Compactable() { - break - } + for s.levels[0].numTables() >= s.kv.opt.NumLevelZeroTablesStall { time.Sleep(10 * time.Millisecond) - if i%100 == 0 { - prios := s.pickCompactLevels() - s.kv.opt.Debugf("Waiting to add level 0 table. Compaction priorities: %+v\n", prios) - i = 0 - } } - if dur := time.Since(timeStart); dur > time.Second { + dur := time.Since(timeStart) + if dur > time.Second { s.kv.opt.Infof("L0 was stalled for %s\n", dur.Round(time.Millisecond)) } + atomic.AddInt64(&s.l0stallsMs, int64(dur.Round(time.Millisecond))) } return nil @@ -1160,6 +1495,39 @@ func (s *levelsController) getTableInfo() (result []TableInfo) { return } +type LevelInfo struct { + Level int + NumTables int + Size int64 + TargetSize int64 + TargetFileSize int64 + IsBaseLevel bool + Score float64 + Adjusted float64 +} + +func (s *levelsController) getLevelInfo() []LevelInfo { + t := s.levelTargets() + prios := s.pickCompactLevels() + result := make([]LevelInfo, len(s.levels)) + for i, l := range s.levels { + l.RLock() + result[i].Level = i + result[i].Size = l.totalSize + result[i].NumTables = len(l.tables) + l.RUnlock() + + result[i].TargetSize = t.targetSz[i] + result[i].TargetFileSize = t.fileSz[i] + result[i].IsBaseLevel = t.baseLevel == i + } + for _, p := range prios { + result[p.level].Score = p.score + result[p.level].Adjusted = p.adjusted + } + return result +} + // verifyChecksum verifies checksum for all tables on all levels. func (s *levelsController) verifyChecksum() error { var tables []*table.Table diff --git a/levels_test.go b/levels_test.go index 12c054d29..ee8223a6b 100644 --- a/levels_test.go +++ b/levels_test.go @@ -55,8 +55,10 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { }); err != nil { panic(err) } + db.lc.levels[level].Lock() // Add table to the given level. db.lc.levels[level].tables = append(db.lc.levels[level].tables, tab) + db.lc.levels[level].Unlock() } type keyValVersion struct { @@ -182,8 +184,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(0, cdef)) + cdef.t.baseLevel = 1 + require.NoError(t, db.lc.runCompactDef(-1, 0, cdef)) // foo version 2 should be dropped after compaction. getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) }) @@ -212,8 +216,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(0, cdef)) + cdef.t.baseLevel = 1 + require.NoError(t, db.lc.runCompactDef(-1, 0, cdef)) // foo version 3 (both) should be dropped after compaction. getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 1, 0}}) }) @@ -245,8 +251,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(0, cdef)) + cdef.t.baseLevel = 1 + require.NoError(t, db.lc.runCompactDef(-1, 0, cdef)) // foo version 2 and version 1 should be dropped after compaction. getAllAndCheck(t, db, []keyValVersion{ {"foo", "bar", 3, 0}, {"foo", "bar", 0, 0}, {"fooz", "baz", 1, 0}, @@ -272,8 +280,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[2], top: db.lc.levels[1].tables, bot: db.lc.levels[2].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(1, cdef)) + cdef.t.baseLevel = 2 + require.NoError(t, db.lc.runCompactDef(-1, 1, cdef)) // foo version 2 should be dropped after compaction. getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 3, 0}, {"fooz", "baz", 1, 0}}) }) @@ -303,8 +313,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[2], top: db.lc.levels[1].tables, bot: db.lc.levels[2].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(1, cdef)) + cdef.t.baseLevel = 2 + require.NoError(t, db.lc.runCompactDef(-1, 1, cdef)) // foo bar version 2 should be dropped after compaction. fooz // baz version 1 will remain because overlap exists, which is // expected because `hasOverlap` is only checked once at the @@ -321,8 +333,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[3], top: db.lc.levels[2].tables, bot: db.lc.levels[3].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(2, cdef)) + cdef.t.baseLevel = 3 + require.NoError(t, db.lc.runCompactDef(-1, 2, cdef)) // everything should be removed now getAllAndCheck(t, db, []keyValVersion{}) }) @@ -350,8 +364,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[2], top: db.lc.levels[1].tables, bot: db.lc.levels[2].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(1, cdef)) + cdef.t.baseLevel = 2 + require.NoError(t, db.lc.runCompactDef(-1, 1, cdef)) // the top table at L1 doesn't overlap L3, but the bottom table at L2 // does, delete keys should not be removed. getAllAndCheck(t, db, []keyValVersion{ @@ -379,8 +395,10 @@ func TestCompaction(t *testing.T) { nextLevel: db.lc.levels[2], top: db.lc.levels[1].tables, bot: db.lc.levels[2].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(1, cdef)) + cdef.t.baseLevel = 2 + require.NoError(t, db.lc.runCompactDef(-1, 1, cdef)) // foo version 2 should be dropped after compaction. getAllAndCheck(t, db, []keyValVersion{{"fooo", "barr", 2, 0}}) }) @@ -415,8 +433,10 @@ func TestCompactionTwoVersions(t *testing.T) { nextLevel: db.lc.levels[2], top: db.lc.levels[1].tables, bot: db.lc.levels[2].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(1, cdef)) + cdef.t.baseLevel = 2 + require.NoError(t, db.lc.runCompactDef(-1, 1, cdef)) // Nothing should be dropped after compaction because number of // versions to keep is 2. getAllAndCheck(t, db, []keyValVersion{ @@ -431,8 +451,10 @@ func TestCompactionTwoVersions(t *testing.T) { nextLevel: db.lc.levels[3], top: db.lc.levels[2].tables, bot: db.lc.levels[3].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(2, cdef)) + cdef.t.baseLevel = 3 + require.NoError(t, db.lc.runCompactDef(-1, 2, cdef)) getAllAndCheck(t, db, []keyValVersion{ {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, @@ -468,8 +490,10 @@ func TestCompactionAllVersions(t *testing.T) { nextLevel: db.lc.levels[2], top: db.lc.levels[1].tables, bot: db.lc.levels[2].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(1, cdef)) + cdef.t.baseLevel = 2 + require.NoError(t, db.lc.runCompactDef(-1, 1, cdef)) // Nothing should be dropped after compaction because all versions // should be kept. getAllAndCheck(t, db, []keyValVersion{ @@ -484,8 +508,10 @@ func TestCompactionAllVersions(t *testing.T) { nextLevel: db.lc.levels[3], top: db.lc.levels[2].tables, bot: db.lc.levels[3].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(2, cdef)) + cdef.t.baseLevel = 3 + require.NoError(t, db.lc.runCompactDef(-1, 2, cdef)) getAllAndCheck(t, db, []keyValVersion{ {"foo", "bar", 3, 0}, {"foo", "bar", 2, 0}, @@ -511,8 +537,10 @@ func TestCompactionAllVersions(t *testing.T) { nextLevel: db.lc.levels[2], top: db.lc.levels[1].tables, bot: db.lc.levels[2].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(1, cdef)) + cdef.t.baseLevel = 2 + require.NoError(t, db.lc.runCompactDef(-1, 1, cdef)) // foo version 2 should be dropped after compaction. getAllAndCheck(t, db, []keyValVersion{{"fooo", "barr", 2, 0}}) }) @@ -547,8 +575,10 @@ func TestDiscardTs(t *testing.T) { nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(0, cdef)) + cdef.t.baseLevel = 1 + require.NoError(t, db.lc.runCompactDef(-1, 0, cdef)) // No keys should be dropped. getAllAndCheck(t, db, []keyValVersion{ {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, @@ -578,8 +608,10 @@ func TestDiscardTs(t *testing.T) { nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(0, cdef)) + cdef.t.baseLevel = 1 + require.NoError(t, db.lc.runCompactDef(-1, 0, cdef)) // foo1 and foo2 should be dropped. getAllAndCheck(t, db, []keyValVersion{ {"foo", "bar", 4, 0}, {"foo", "bar", 3, 0}, {"fooz", "baz", 2, 0}, @@ -609,8 +641,10 @@ func TestDiscardTs(t *testing.T) { nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(0, cdef)) + cdef.t.baseLevel = 1 + require.NoError(t, db.lc.runCompactDef(-1, 0, cdef)) // Only one version of every key should be left. getAllAndCheck(t, db, []keyValVersion{{"foo", "bar", 4, 0}, {"fooz", "baz", 3, 0}}) }) @@ -650,8 +684,10 @@ func TestDiscardFirstVersion(t *testing.T) { nextLevel: db.lc.levels[1], top: db.lc.levels[0].tables, bot: db.lc.levels[1].tables, + t: db.lc.levelTargets(), } - require.NoError(t, db.lc.runCompactDef(0, cdef)) + cdef.t.baseLevel = 1 + require.NoError(t, db.lc.runCompactDef(-1, 0, cdef)) // - Version 10, 9 lie above version 7 so they should be there. // - Version 4, 3, 2 lie below the discardTs but they don't have the @@ -673,6 +709,8 @@ func TestDiscardFirstVersion(t *testing.T) { // This test ensures we don't stall when L1's size is greater than opt.LevelOneSize. // We should stall only when L0 tables more than the opt.NumLevelZeroTableStall. func TestL1Stall(t *testing.T) { + // TODO(ibrahim): Is this test still valid? + t.Skip() opt := DefaultOptions("") // Disable all compactions. opt.NumCompactors = 0 @@ -681,7 +719,7 @@ func TestL1Stall(t *testing.T) { // Addition of new tables will stall if there are 4 or more L0 tables. opt.NumLevelZeroTablesStall = 4 // Level 1 size is 10 bytes. - opt.LevelOneSize = 10 + opt.BaseLevelSize = 10 runBadgerTest(t, &opt, func(t *testing.T, db *DB) { // Level 0 has 4 tables. @@ -739,6 +777,8 @@ func createEmptyTable(db *DB) *table.Table { } func TestL0Stall(t *testing.T) { + // TODO(ibrahim): Is this test still valid? + t.Skip() opt := DefaultOptions("") // Disable all compactions. opt.NumCompactors = 0 @@ -888,8 +928,7 @@ func TestKeyVersions(t *testing.T) { inMemoryOpt := DefaultOptions(""). WithSyncWrites(false). WithInMemory(true). - WithLogRotatesToFlush(math.MaxInt32). - WithMaxTableSize(4 << 20) + WithMemTableSize(4 << 20) t.Run("disk", func(t *testing.T) { t.Run("small table", func(t *testing.T) { diff --git a/managed_db_test.go b/managed_db_test.go index e7f1b962a..dd2d099ea 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -416,13 +416,14 @@ func TestDropPrefix(t *testing.T) { populate(db) require.Equal(t, int(N), numKeys(db)) require.NoError(t, db.DropPrefix([]byte("key"))) - db.Close() + require.Equal(t, 0, numKeys(db)) + require.NoError(t, db.Close()) // Ensure that value log is correctly replayed. db2, err := Open(opts) require.NoError(t, err) require.Equal(t, 0, numKeys(db2)) - db2.Close() + require.NoError(t, db2.Close()) } func TestDropPrefixWithPendingTxn(t *testing.T) { @@ -605,7 +606,7 @@ func TestWriteBatchManagedMode(t *testing.T) { } opt := DefaultOptions("") opt.managedTxns = true - opt.MaxTableSize = 1 << 20 // This would create multiple transactions in write batch. + opt.BaseTableSize = 1 << 20 // This would create multiple transactions in write batch. runBadgerTest(t, &opt, func(t *testing.T, db *DB) { wb := db.NewWriteBatchAt(1) defer wb.Cancel() @@ -651,7 +652,7 @@ func TestWriteBatchManaged(t *testing.T) { } opt := DefaultOptions("") opt.managedTxns = true - opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch. + opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch. runBadgerTest(t, &opt, func(t *testing.T, db *DB) { wb := db.NewManagedWriteBatch() defer wb.Cancel() @@ -720,7 +721,7 @@ func TestWriteBatchDuplicate(t *testing.T) { t.Run("writebatch", func(t *testing.T) { opt := DefaultOptions("") - opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch. + opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch. runBadgerTest(t, &opt, func(t *testing.T, db *DB) { wb := db.NewWriteBatch() @@ -736,7 +737,7 @@ func TestWriteBatchDuplicate(t *testing.T) { }) t.Run("writebatch at", func(t *testing.T) { opt := DefaultOptions("") - opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch. + opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch. opt.managedTxns = true runBadgerTest(t, &opt, func(t *testing.T, db *DB) { @@ -755,7 +756,7 @@ func TestWriteBatchDuplicate(t *testing.T) { t.Run("managed writebatch", func(t *testing.T) { opt := DefaultOptions("") opt.managedTxns = true - opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch. + opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch. runBadgerTest(t, &opt, func(t *testing.T, db *DB) { wb := db.NewManagedWriteBatch() defer wb.Cancel() diff --git a/manifest_test.go b/manifest_test.go index b2b5f0a01..b6971dda1 100644 --- a/manifest_test.go +++ b/manifest_test.go @@ -178,14 +178,16 @@ func TestOverlappingKeyRangeError(t *testing.T) { thisLevel: lh0, nextLevel: lh1, span: span, + t: kv.lc.levelTargets(), } + cd.t.baseLevel = 1 manifest := createManifest() lc, err := newLevelsController(kv, &manifest) require.NoError(t, err) done = lc.fillTablesL0(&cd) require.Equal(t, true, done) - lc.runCompactDef(0, cd) + lc.runCompactDef(-1, 0, cd) span.End() _, span = otrace.StartSpan(context.Background(), "Badger.Compaction") @@ -199,9 +201,11 @@ func TestOverlappingKeyRangeError(t *testing.T) { thisLevel: lh0, nextLevel: lh1, span: span, + t: kv.lc.levelTargets(), } + cd.t.baseLevel = 1 lc.fillTablesL0(&cd) - lc.runCompactDef(0, cd) + lc.runCompactDef(-1, 0, cd) } func TestManifestRewrite(t *testing.T) { diff --git a/memtable.go b/memtable.go index ae887d134..724de7c2b 100644 --- a/memtable.go +++ b/memtable.go @@ -85,6 +85,12 @@ func (db *DB) openMemTables(opt Options) error { if err != nil { return y.Wrapf(err, "while opening fid: %d", fid) } + // If this memtable is empty we don't need to add it. This is a + // memtable that was completely truncated. + if mt.sl.Empty() { + mt.DecrRef() + continue + } // These should no longer be written to. So, make them part of the imm. db.imm = append(db.imm, mt) } diff --git a/options.go b/options.go index 8184aaac2..12ba83cb8 100644 --- a/options.go +++ b/options.go @@ -50,11 +50,15 @@ type Options struct { // Fine tuning options. - MaxTableSize int64 + MemTableSize int64 + BaseTableSize int64 + BaseLevelSize int64 LevelSizeMultiplier int + TableSizeMultiplier int MaxLevels int - ValueThreshold int - NumMemtables int + + ValueThreshold int + NumMemtables int // Changing BlockSize across DB runs will not break badger. The block size is // read from the block index stored at the end of the table. BlockSize int @@ -65,13 +69,11 @@ type Options struct { NumLevelZeroTables int NumLevelZeroTablesStall int - LevelOneSize int64 ValueLogFileSize int64 ValueLogMaxEntries uint32 NumCompactors int CompactL0OnClose bool - LogRotatesToFlush int32 ZSTDCompressionLevel int // When set, checksum will be validated for each entry read from the value log file. @@ -109,13 +111,17 @@ type Options struct { // Feel free to modify these to suit your needs with the WithX methods. func DefaultOptions(path string) Options { return Options{ - Dir: path, - ValueDir: path, - LevelOneSize: 256 << 20, - LevelSizeMultiplier: 15, - MaxLevels: 7, - MaxTableSize: 64 << 20, - NumCompactors: 2, // Run at least 2 compactors. One is dedicated for L0. + Dir: path, + ValueDir: path, + + MemTableSize: 64 << 20, + BaseTableSize: 2 << 20, + BaseLevelSize: 10 << 20, + TableSizeMultiplier: 2, + LevelSizeMultiplier: 10, + MaxLevels: 7, + + NumCompactors: 4, // Run at least 2 compactors. Zero-th compactor prioritizes L0. NumLevelZeroTables: 5, NumLevelZeroTablesStall: 15, NumMemtables: 5, @@ -125,8 +131,8 @@ func DefaultOptions(path string) Options { NumVersionsToKeep: 1, CompactL0OnClose: true, VerifyValueChecksum: false, - Compression: options.None, - BlockCacheSize: 0, + Compression: options.Snappy, + BlockCacheSize: 256 << 20, IndexCacheSize: 0, // The following benchmarks were done on a 4 KB block size (default block size). The @@ -149,7 +155,6 @@ func DefaultOptions(path string) Options { ValueLogMaxEntries: 1000000, ValueThreshold: 1 << 10, // 1 KB. Logger: defaultLogger(INFO), - LogRotatesToFlush: 2, EncryptionKey: []byte{}, EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days. DetectConflicts: true, @@ -160,7 +165,7 @@ func buildTableOptions(opt Options) table.Options { return table.Options{ SyncWrites: opt.SyncWrites, ReadOnly: opt.ReadOnly, - TableSize: uint64(opt.MaxTableSize), + TableSize: uint64(opt.BaseTableSize), BlockSize: opt.BlockSize, BloomFalsePositive: opt.BloomFalsePositive, ChkMode: opt.ChecksumVerificationMode, @@ -269,13 +274,13 @@ func (opt Options) WithLoggingLevel(val loggingLevel) Options { return opt } -// WithMaxTableSize returns a new Options value with MaxTableSize set to the given value. +// WithBaseTableSize returns a new Options value with MaxTableSize set to the given value. // -// MaxTableSize sets the maximum size in bytes for each LSM table or file. +// BaseTableSize sets the maximum size in bytes for LSM table or file in the base level. // -// The default value of MaxTableSize is 64MB. -func (opt Options) WithMaxTableSize(val int64) Options { - opt.MaxTableSize = val +// The default value of BaseTableSize is 2MB. +func (opt Options) WithBaseTableSize(val int64) Options { + opt.BaseTableSize = val return opt } @@ -286,7 +291,7 @@ func (opt Options) WithMaxTableSize(val int64) Options { // Once a level grows to be larger than this ratio allowed, the compaction process will be // triggered. // -// The default value of LevelSizeMultiplier is 15. +// The default value of LevelSizeMultiplier is 10. func (opt Options) WithLevelSizeMultiplier(val int) Options { opt.LevelSizeMultiplier = val return opt @@ -323,6 +328,16 @@ func (opt Options) WithNumMemtables(val int) Options { return opt } +// WithMemTableSize returns a new Options value with MemTableSize set to the given value. +// +// MemTableSize sets the maximum size in bytes for memtable table. +// +// The default value of MemTableSize is 64MB. +func (opt Options) WithMemTableSize(val int64) Options { + opt.MemTableSize = val + return opt +} + // WithBloomFalsePositive returns a new Options value with BloomFalsePositive set // to the given value. // @@ -350,10 +365,7 @@ func (opt Options) WithBlockSize(val int) Options { return opt } -// WithNumLevelZeroTables returns a new Options value with NumLevelZeroTables set to the given -// value. -// -// NumLevelZeroTables sets the maximum number of Level 0 tables before compaction starts. +// WithNumLevelZeroTables sets the maximum number of Level 0 tables before compaction starts. // // The default value of NumLevelZeroTables is 5. func (opt Options) WithNumLevelZeroTables(val int) Options { @@ -361,10 +373,7 @@ func (opt Options) WithNumLevelZeroTables(val int) Options { return opt } -// WithNumLevelZeroTablesStall returns a new Options value with NumLevelZeroTablesStall set to the -// given value. -// -// NumLevelZeroTablesStall sets the number of Level 0 tables that once reached causes the DB to +// WithNumLevelZeroTablesStall sets the number of Level 0 tables that once reached causes the DB to // stall until compaction succeeds. // // The default value of NumLevelZeroTablesStall is 10. @@ -373,19 +382,15 @@ func (opt Options) WithNumLevelZeroTablesStall(val int) Options { return opt } -// WithLevelOneSize returns a new Options value with LevelOneSize set to the given value. -// -// LevelOneSize sets the maximum total size for Level 1. +// WithBaseLevelSize sets the maximum size target for the base level. // -// The default value of LevelOneSize is 20MB. -func (opt Options) WithLevelOneSize(val int64) Options { - opt.LevelOneSize = val +// The default value is 10MB. +func (opt Options) WithBaseLevelSize(val int64) Options { + opt.BaseLevelSize = val return opt } -// WithValueLogFileSize returns a new Options value with ValueLogFileSize set to the given value. -// -// ValueLogFileSize sets the maximum size of a single value log file. +// WithValueLogFileSize sets the maximum size of a single value log file. // // The default value of ValueLogFileSize is 1GB. func (opt Options) WithValueLogFileSize(val int64) Options { @@ -393,12 +398,9 @@ func (opt Options) WithValueLogFileSize(val int64) Options { return opt } -// WithValueLogMaxEntries returns a new Options value with ValueLogMaxEntries set to the given -// value. -// -// ValueLogMaxEntries sets the maximum number of entries a value log file can hold approximately. -// A actual size limit of a value log file is the minimum of ValueLogFileSize and -// ValueLogMaxEntries. +// WithValueLogMaxEntries sets the maximum number of entries a value log file +// can hold approximately. A actual size limit of a value log file is the +// minimum of ValueLogFileSize and ValueLogMaxEntries. // // The default value of ValueLogMaxEntries is one million (1000000). func (opt Options) WithValueLogMaxEntries(val uint32) Options { @@ -406,10 +408,8 @@ func (opt Options) WithValueLogMaxEntries(val uint32) Options { return opt } -// WithNumCompactors returns a new Options value with NumCompactors set to the given value. -// -// NumCompactors sets the number of compaction workers to run concurrently. -// Setting this to zero stops compactions, which could eventually cause writes to block forever. +// WithNumCompactors sets the number of compaction workers to run concurrently. Setting this to +// zero stops compactions, which could eventually cause writes to block forever. // // The default value of NumCompactors is 2. One is dedicated just for L0 and L1. func (opt Options) WithNumCompactors(val int) Options { @@ -417,11 +417,9 @@ func (opt Options) WithNumCompactors(val int) Options { return opt } -// WithCompactL0OnClose returns a new Options value with CompactL0OnClose set to the given value. -// -// CompactL0OnClose determines whether Level 0 should be compacted before closing the DB. -// This ensures that both reads and writes are efficient when the DB is opened later. -// CompactL0OnClose is set to true if KeepL0InMemory is set to true. +// WithCompactL0OnClose determines whether Level 0 should be compacted before closing the DB. This +// ensures that both reads and writes are efficient when the DB is opened later. CompactL0OnClose +// is set to true if KeepL0InMemory is set to true. // // The default value of CompactL0OnClose is true. func (opt Options) WithCompactL0OnClose(val bool) Options { @@ -429,23 +427,7 @@ func (opt Options) WithCompactL0OnClose(val bool) Options { return opt } -// WithLogRotatesToFlush returns a new Options value with LogRotatesToFlush set to the given value. -// -// LogRotatesToFlush sets the number of value log file rotates after which the Memtables are -// flushed to disk. This is useful in write loads with fewer keys and larger values. This work load -// would fill up the value logs quickly, while not filling up the Memtables. Thus, on a crash -// and restart, the value log head could cause the replay of a good number of value log files -// which can slow things on start. -// -// The default value of LogRotatesToFlush is 2. -func (opt Options) WithLogRotatesToFlush(val int32) Options { - opt.LogRotatesToFlush = val - return opt -} - -// WithEncryptionKey return a new Options value with EncryptionKey set to the given value. -// -// EncryptionKey is used to encrypt the data with AES. Type of AES is used based on the key +// WithEncryptionKey is used to encrypt the data with AES. Type of AES is used based on the key // size. For example 16 bytes will use AES-128. 24 bytes will use AES-192. 32 bytes will // use AES-256. func (opt Options) WithEncryptionKey(key []byte) Options { @@ -463,10 +445,9 @@ func (opt Options) WithEncryptionKeyRotationDuration(d time.Duration) Options { return opt } -// WithCompression returns a new Options value with Compression set to the given value. -// -// When compression is enabled, every block will be compressed using the specified algorithm. -// This option doesn't affect existing tables. Only the newly created tables will be compressed. +// WithCompression is used to enable or disable compression. When compression is enabled, every +// block will be compressed using the specified algorithm. This option doesn't affect existing +// tables. Only the newly created tables will be compressed. // // The default compression algorithm used is zstd when built with Cgo. Without Cgo, the default is // snappy. Compression is enabled by default. @@ -475,12 +456,9 @@ func (opt Options) WithCompression(cType options.CompressionType) Options { return opt } -// WithVerifyValueChecksum returns a new Options value with VerifyValueChecksum set to -// the given value. -// -// When VerifyValueChecksum is set to true, checksum will be verified for every entry read -// from the value log. If the value is stored in SST (value size less than value threshold) then the -// checksum validation will not be done. +// WithVerifyValueChecksum is used to set VerifyValueChecksum. When VerifyValueChecksum is set to +// true, checksum will be verified for every entry read from the value log. If the value is stored +// in SST (value size less than value threshold) then the checksum validation will not be done. // // The default value of VerifyValueChecksum is False. func (opt Options) WithVerifyValueChecksum(val bool) Options { diff --git a/stream_writer.go b/stream_writer.go index c9e0c2eda..95c62d544 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -255,6 +255,9 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { bopts := buildTableOptions(sw.db.opt) bopts.DataKey = dk + for i := 2; i < sw.db.opt.MaxLevels; i++ { + bopts.TableSize *= uint64(sw.db.opt.TableSizeMultiplier) + } w := &sortedWriter{ db: sw.db, streamID: streamID, @@ -320,7 +323,7 @@ func (w *sortedWriter) Add(key []byte, vs y.ValueStruct) error { sameKey := y.SameKey(key, w.lastKey) // Same keys should go into the same SSTable. - if !sameKey && w.builder.ReachedCapacity(uint64(float64(w.db.opt.MaxTableSize)*0.9)) { + if !sameKey && w.builder.ReachedCapacity() { if err := w.send(false); err != nil { return err } @@ -400,21 +403,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { } lc := w.db.lc - var lhandler *levelHandler - // We should start the levels from 1. - y.AssertTrue(len(lc.levels) > 1) - for _, l := range lc.levels[1:] { - ratio := float64(l.getTotalSize()) / float64(l.maxTotalSize) - if ratio < 1.0 { - lhandler = l - break - } - } - if lhandler == nil { - // If we're exceeding the size of the lowest level, shove it in the lowest level. Can't do - // better than that. - lhandler = lc.levels[len(lc.levels)-1] - } + lhandler := lc.levels[len(lc.levels)-1] // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ Id: tbl.ID(), diff --git a/stream_writer_test.go b/stream_writer_test.go index 001aa8442..f440257fc 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -325,16 +325,23 @@ func TestStreamWriter5(t *testing.T) { func TestStreamWriter6(t *testing.T) { runBadgerTest(t, nil, func(t *testing.T, db *DB) { list := &pb.KVList{} - str := []string{"a", "a", "b", "b", "c", "c"} - ver := 1 + str := []string{"a", "b", "c"} + ver := uint64(0) + // The baseTable size is 32 KB (1<<15) and the max table size for level + // 6 table is 1 mb (look at newWrite function). Since all the tables + // will be written to level 6, we need to insert at least 1 mb of data. + // Setting keycount below 32 would cause this test to fail. + keyCount := 40 for i := range str { - kv := &pb.KV{ - Key: bytes.Repeat([]byte(str[i]), int(db.opt.MaxTableSize)), - Value: []byte("val"), - Version: uint64(ver), + for j := 0; j < keyCount; j++ { + ver++ + kv := &pb.KV{ + Key: bytes.Repeat([]byte(str[i]), int(db.opt.BaseTableSize)), + Value: []byte("val"), + Version: uint64(keyCount - j), + } + list.Kv = append(list.Kv, kv) } - list.Kv = append(list.Kv, kv) - ver = (ver + 1) % 2 } // list has 3 pairs for equal keys. Since each Key has size equal to MaxTableSize @@ -347,12 +354,8 @@ func TestStreamWriter6(t *testing.T) { tables := db.Tables() require.Equal(t, 3, len(tables), "Count of tables not matching") for _, tab := range tables { - if tab.Level > 0 { - require.Equal(t, 2, int(tab.KeyCount), - fmt.Sprintf("failed for level: %d", tab.Level)) - } else { - require.Equal(t, 1, int(tab.KeyCount)) // level 0 table will have head key - } + require.Equal(t, keyCount, int(tab.KeyCount), + fmt.Sprintf("failed for level: %d", tab.Level)) } require.NoError(t, db.Close()) db, err := Open(db.opt) diff --git a/table/builder.go b/table/builder.go index 047e2b092..b42eec8ea 100644 --- a/table/builder.go +++ b/table/builder.go @@ -108,6 +108,7 @@ func NewTableBuilder(opts Options) *Builder { opt: &opts, offsets: z.NewBuffer(1 << 20), } + b.opt.tableCapacity = uint64(float64(b.opt.TableSize) * 0.9) // If encryption or compression is not enabled, do not start compression/encryption goroutines // and write directly to the buffer. @@ -370,7 +371,7 @@ func (b *Builder) Add(key []byte, value y.ValueStruct, valueLen uint32) { // at the end. The diff can vary. // ReachedCapacity returns true if we... roughly (?) reached capacity? -func (b *Builder) ReachedCapacity(capacity uint64) bool { +func (b *Builder) ReachedCapacity() bool { blocksSize := atomic.LoadUint32(&b.actualSize) + // actual length of current buffer uint32(len(b.entryOffsets)*4) + // all entry offsets size 4 + // count of all entry offsets @@ -381,7 +382,7 @@ func (b *Builder) ReachedCapacity(capacity uint64) bool { 4 + // Index length uint32(b.offsets.LenNoPadding()) - return uint64(estimateSz) > capacity + return uint64(estimateSz) > b.opt.tableCapacity } // Finish finishes the table by appending the index. diff --git a/table/table.go b/table/table.go index d96fa805f..5a9f21843 100644 --- a/table/table.go +++ b/table/table.go @@ -29,6 +29,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "unsafe" "github.com/golang/protobuf/proto" @@ -55,7 +56,8 @@ type Options struct { ReadOnly bool // Maximum size of the table. - TableSize uint64 + TableSize uint64 + tableCapacity uint64 // 0.9x TableSize. // ChkMode is the checksum verification mode for Table. ChkMode options.ChecksumVerificationMode @@ -103,7 +105,8 @@ type Table struct { smallest, biggest []byte // Smallest and largest keys (with timestamps). id uint64 // file id, part of filename - Checksum []byte + Checksum []byte + CreatedAt time.Time // Stores the total size of key-values stored in this table (including the size on vlog). estimatedSize uint32 indexStart int @@ -273,6 +276,7 @@ func OpenTable(mf *z.MmapFile, opts Options) (*Table, error) { opt: &opts, IsInmemory: false, tableSize: int(fileInfo.Size()), + CreatedAt: fileInfo.ModTime(), } if err := t.initBiggestAndSmallest(); err != nil { diff --git a/txn_test.go b/txn_test.go index 46a2915ca..ffa8c1ff7 100644 --- a/txn_test.go +++ b/txn_test.go @@ -844,8 +844,8 @@ func TestArmV7Issue311Fix(t *testing.T) { db, err := Open(DefaultOptions(dir). WithValueLogFileSize(16 << 20). - WithLevelOneSize(8 << 20). - WithMaxTableSize(2 << 20). + WithBaseLevelSize(8 << 20). + WithBaseTableSize(2 << 20). WithSyncWrites(false)) require.NoError(t, err) diff --git a/value.go b/value.go index 862a83a4d..54a4ce618 100644 --- a/value.go +++ b/value.go @@ -798,8 +798,9 @@ func (vlog *valueLog) write(reqs []*request) error { n := uint32(buf.Len()) endOffset := atomic.AddUint32(&vlog.writableLogOffset, n) + // Increase the file size if we cannot accommodate this entry. if int(endOffset) >= len(curlf.Data) { - return y.Wrapf(ErrTxnTooBig, "endOffset: %d len: %d\n", endOffset, len(curlf.Data)) + curlf.Truncate(int64(endOffset)) } start := int(endOffset - n) @@ -821,7 +822,6 @@ func (vlog *valueLog) write(reqs []*request) error { return err } curlf = newlf - atomic.AddInt32(&vlog.db.logRotates, 1) } return nil } diff --git a/value_test.go b/value_test.go index 6ba065d61..a6fb91881 100644 --- a/value_test.go +++ b/value_test.go @@ -432,8 +432,10 @@ func TestPersistLFDiscardStats(t *testing.T) { require.NoError(t, err) defer removeDir(dir) opt := getTestOptions(dir) + // Force more compaction by reducing the number of L0 tables. + opt.NumLevelZeroTables = 1 opt.ValueLogFileSize = 1 << 20 - // avoid compaction on close, so that discard map remains same + // Avoid compaction on close so that the discard map remains the same. opt.CompactL0OnClose = false db, err := Open(opt) @@ -868,7 +870,7 @@ func TestBug578(t *testing.T) { db, err := Open(DefaultOptions(dir). WithValueLogMaxEntries(64). - WithMaxTableSize(1 << 13)) + WithBaseTableSize(1 << 13)) require.NoError(t, err) h := testHelper{db: db, t: t} @@ -957,6 +959,7 @@ func BenchmarkReadWrite(b *testing.B) { } // Regression test for https://github.com/dgraph-io/badger/issues/817 +// This test verifies if fully corrupted memtables are deleted on reopen. func TestValueLogTruncate(t *testing.T) { dir, err := ioutil.TempDir("", "badger-test") require.NoError(t, err)