Skip to content

Commit

Permalink
Add separate cache for bloomfilters (#1260)
Browse files Browse the repository at this point in the history
#1256 showed that a single
cache might not be enough to store the data blocks and the bloom
filters.
This commit adds a separate cache for the bloom filters. This
commit also adds a new flag `LoadBloomsOnOpen` which determines
if the bloom filters should be loaded when the table is opened on
or not.

The default value of `MaxBfCacheSize` is `zero` and 
`LoadBloomsOnOpen` is true.

This change has significant performance improvement on read speeds
because a single cache would lead to bloom filter eviction and we
would read the bloom filter from the disk.

(cherry picked from commit eaf64c0)
  • Loading branch information
Ibrahim Jarif committed Mar 24, 2020
1 parent fbddfd2 commit 77308f2
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 21 deletions.
32 changes: 29 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type DB struct {
pub *publisher
registry *KeyRegistry
blockCache *ristretto.Cache
bfCache *ristretto.Cache
}

const (
Expand Down Expand Up @@ -304,7 +305,21 @@ func Open(opt Options) (db *DB, err error) {
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache")
return nil, errors.Wrap(err, "failed to create data cache")
}
}

if opt.MaxBfCacheSize > 0 {
config := ristretto.Config{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(opt.MaxBfCacheSize) * 0.05 * 2),
MaxCost: int64(float64(opt.MaxBfCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, errors.Wrap(err, "failed to create bf cache")
}
}

Expand Down Expand Up @@ -393,14 +408,22 @@ func Open(opt Options) (db *DB, err error) {
return db, nil
}

// CacheMetrics returns the metrics for the underlying cache.
func (db *DB) CacheMetrics() *ristretto.Metrics {
// DataCacheMetrics returns the metrics for the underlying data cache.
func (db *DB) DataCacheMetrics() *ristretto.Metrics {
if db.blockCache != nil {
return db.blockCache.Metrics
}
return nil
}

// BfCacheMetrics returns the metrics for the underlying bloom filter cache.
func (db *DB) BfCacheMetrics() *ristretto.Metrics {
if db.bfCache != nil {
return db.bfCache.Metrics
}
return nil
}

// Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
// disk. Calling DB.Close() multiple times would still only close the DB once.
func (db *DB) Close() error {
Expand Down Expand Up @@ -490,6 +513,7 @@ func (db *DB) close() (err error) {
db.closers.updateSize.SignalAndWait()
db.orc.Stop()
db.blockCache.Close()
db.bfCache.Close()

db.elog.Finish()
if db.opt.InMemory {
Expand Down Expand Up @@ -957,6 +981,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = db.blockCache
bopts.BfCache = db.bfCache
tableData := buildL0Table(ft, bopts)

fileID := db.lc.reserveFileID()
Expand Down Expand Up @@ -1515,6 +1540,7 @@ func (db *DB) dropAll() (func(), error) {
db.lc.nextFileID = 1
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
db.blockCache.Clear()
db.bfCache.Clear()

return resume, nil
}
Expand Down
2 changes: 2 additions & 0 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
topt.Compression = tf.Compression
topt.DataKey = dk
topt.Cache = db.blockCache
topt.BfCache = db.bfCache
t, err := table.OpenTable(fd, topt)
if err != nil {
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
Expand Down Expand Up @@ -530,6 +531,7 @@ func (s *levelsController) compactBuildTables(
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = s.kv.blockCache
bopts.BfCache = s.kv.bfCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
for ; it.Valid(); it.Next() {
Expand Down
34 changes: 34 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Options struct {
BloomFalsePositive float64
KeepL0InMemory bool
MaxCacheSize int64
MaxBfCacheSize int64
LoadBloomsOnOpen bool

NumLevelZeroTables int
NumLevelZeroTablesStall int
Expand Down Expand Up @@ -130,6 +132,8 @@ func DefaultOptions(path string) Options {
VerifyValueChecksum: false,
Compression: options.None,
MaxCacheSize: 1 << 30, // 1 GB
MaxBfCacheSize: 0,
LoadBloomsOnOpen: true,
// The following benchmarks were done on a 4 KB block size (default block size). The
// compression is ratio supposed to increase with increasing compression level but since the
// input for compression algorithm is small (4 KB), we don't get significant benefit at
Expand Down Expand Up @@ -163,6 +167,7 @@ func buildTableOptions(opt Options) table.Options {
TableSize: uint64(opt.MaxTableSize),
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadBloomsOnOpen: opt.LoadBloomsOnOpen,
LoadingMode: opt.TableLoadingMode,
ChkMode: opt.ChecksumVerificationMode,
Compression: opt.Compression,
Expand Down Expand Up @@ -593,3 +598,32 @@ func (opt Options) WithBypassLockGuard(b bool) Options {
opt.BypassLockGuard = b
return opt
}

// WithMaxBfCacheSize returns a new Options value with MaxBfCacheSize set to the given value.
//
// This value specifies how much memory should be used by the bloom filters.
// Badger uses bloom filters to speed up lookups. Each table has its own bloom
// filter and each bloom filter is approximately of 5 MB.
//
// Zero value for BfCacheSize means all the bloom filters will be kept in
// memory and the cache is disabled.
//
// The default value of MaxBfCacheSize is 0 which means all bloom filters will
// be kept in memory.
func (opt Options) WithMaxBfCacheSize(size int64) Options {
opt.MaxBfCacheSize = size
return opt
}

// WithLoadsBloomOnOpen returns a new Options value with LoadBloomsOnOpen set to the given value.
//
// Badger uses bloom filters to speed up key lookups. When LoadBloomsOnOpen is set
// to false, all bloom filters will be loaded on DB open. This is supposed to
// improve the read speed but it will affect the time taken to open the DB. Set
// this option to true to reduce the time taken to open the DB.
//
// The default value of LoadBloomsOnOpen is false.
func (opt Options) WithLoadBloomsOnOpen(b bool) Options {
opt.LoadBloomsOnOpen = b
return opt
}
1 change: 1 addition & 0 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
opts := buildTableOptions(w.db.opt)
opts.DataKey = builder.DataKey()
opts.Cache = w.db.blockCache
opts.BfCache = w.db.bfCache
var tbl *table.Table
if w.db.opt.InMemory {
var err error
Expand Down
40 changes: 22 additions & 18 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ type Options struct {
// Compression indicates the compression algorithm used for block compression.
Compression options.CompressionType

Cache *ristretto.Cache
Cache *ristretto.Cache
BfCache *ristretto.Cache

// ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
ZSTDCompressionLevel int

// When LoadBloomsOnOpen is set, bloom filters will be read only when they are accessed.
// Otherwise they will be loaded on table open.
LoadBloomsOnOpen bool
}

// TableInterface is useful for testing.
Expand All @@ -93,7 +98,8 @@ type Table struct {
tableSize int // Initialized in OpenTable, using fd.Stat().

blockIndex []*pb.BlockOffset
ref int32 // For file garbage collection. Atomic.
ref int32 // For file garbage collection. Atomic.
bf *z.Bloom // Nil if BfCache is set.

mmap []byte // Memory mapped.

Expand Down Expand Up @@ -156,7 +162,7 @@ func (t *Table) DecrRef() error {
t.opt.Cache.Del(t.blockCacheKey(i))
}
// Delete bloom filter from the cache.
t.opt.Cache.Del(t.bfCacheKey())
t.opt.BfCache.Del(t.bfCacheKey())

}
return nil
Expand Down Expand Up @@ -374,16 +380,10 @@ func (t *Table) readIndex() error {
t.estimatedSize = index.EstimatedSize
t.blockIndex = index.Offsets

// Avoid the cost of unmarshalling the bloom filters if the cache is absent.
if t.opt.Cache != nil {
var bf *z.Bloom
if bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil {
return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex",
t.id)
}

t.opt.Cache.Set(t.bfCacheKey(), bf, int64(len(index.BloomFilter)))
if !t.opt.LoadBloomsOnOpen {
t.bf, _ = t.readBloomFilter()
}

return nil
}

Expand Down Expand Up @@ -509,20 +509,24 @@ func (t *Table) ID() uint64 { return t.id }
func (t *Table) DoesNotHave(hash uint64) bool {
var bf *z.Bloom

// Return fast if cache is absent.
if t.opt.Cache == nil {
bf, _ := t.readBloomFilter()
return !bf.Has(hash)
// Return fast if the cache is absent.
if t.opt.BfCache == nil {
// Load bloomfilter into memory if the cache is absent.
if t.bf == nil {
y.AssertTrue(t.opt.LoadBloomsOnOpen)
t.bf, _ = t.readBloomFilter()
}
return !t.bf.Has(hash)
}

// Check if the bloomfilter exists in the cache.
if b, ok := t.opt.Cache.Get(t.bfCacheKey()); b != nil && ok {
if b, ok := t.opt.BfCache.Get(t.bfCacheKey()); b != nil && ok {
bf = b.(*z.Bloom)
return !bf.Has(hash)
}

bf, sz := t.readBloomFilter()
t.opt.Cache.Set(t.bfCacheKey(), bf, int64(sz))
t.opt.BfCache.Set(t.bfCacheKey(), bf, int64(sz))
return !bf.Has(hash)
}

Expand Down

0 comments on commit 77308f2

Please sign in to comment.