Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add separate cache for bloomfilters #1260

Merged
merged 5 commits into from
Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type DB struct {
pub *publisher
registry *KeyRegistry
blockCache *ristretto.Cache
bfCache *ristretto.Cache
}

const (
Expand Down Expand Up @@ -304,7 +305,21 @@ func Open(opt Options) (db *DB, err error) {
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, errors.Wrap(err, "failed to create cache")
return nil, errors.Wrap(err, "failed to create data cache")
}
}

if opt.MaxBfCacheSize > 0 {
config := ristretto.Config{
// Use 5% of cache memory for storing counters.
NumCounters: int64(float64(opt.MaxBfCacheSize) * 0.05 * 2),
MaxCost: int64(float64(opt.MaxBfCacheSize) * 0.95),
BufferItems: 64,
Metrics: true,
}
db.blockCache, err = ristretto.NewCache(&config)
if err != nil {
return nil, errors.Wrap(err, "failed to create bf cache")
}
}

Expand Down Expand Up @@ -393,14 +408,22 @@ func Open(opt Options) (db *DB, err error) {
return db, nil
}

// CacheMetrics returns the metrics for the underlying cache.
func (db *DB) CacheMetrics() *ristretto.Metrics {
// DataCacheMetrics returns the metrics for the underlying data cache.
func (db *DB) DataCacheMetrics() *ristretto.Metrics {
if db.blockCache != nil {
return db.blockCache.Metrics
}
return nil
}

// BfCacheMetrics returns the metrics for the underlying bloom filter cache.
func (db *DB) BfCacheMetrics() *ristretto.Metrics {
if db.bfCache != nil {
return db.bfCache.Metrics
}
return nil
}

// Close closes a DB. It's crucial to call it to ensure all the pending updates make their way to
// disk. Calling DB.Close() multiple times would still only close the DB once.
func (db *DB) Close() error {
Expand Down Expand Up @@ -490,6 +513,7 @@ func (db *DB) close() (err error) {
db.closers.updateSize.SignalAndWait()
db.orc.Stop()
db.blockCache.Close()
db.bfCache.Close()

db.elog.Finish()
if db.opt.InMemory {
Expand Down Expand Up @@ -957,6 +981,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = db.blockCache
bopts.BfCache = db.bfCache
tableData := buildL0Table(ft, bopts)

fileID := db.lc.reserveFileID()
Expand Down Expand Up @@ -1515,6 +1540,7 @@ func (db *DB) dropAll() (func(), error) {
db.lc.nextFileID = 1
db.opt.Infof("Deleted %d value log files. DropAll done.\n", num)
db.blockCache.Clear()
db.bfCache.Clear()

return resume, nil
}
Expand Down
2 changes: 2 additions & 0 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
topt.Compression = tf.Compression
topt.DataKey = dk
topt.Cache = db.blockCache
topt.BfCache = db.bfCache
t, err := table.OpenTable(fd, topt)
if err != nil {
if strings.HasPrefix(err.Error(), "CHECKSUM_MISMATCH:") {
Expand Down Expand Up @@ -530,6 +531,7 @@ func (s *levelsController) compactBuildTables(
bopts.DataKey = dk
// Builder does not need cache but the same options are used for opening table.
bopts.Cache = s.kv.blockCache
bopts.BfCache = s.kv.bfCache
builder := table.NewTableBuilder(bopts)
var numKeys, numSkips uint64
for ; it.Valid(); it.Next() {
Expand Down
34 changes: 34 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Options struct {
BloomFalsePositive float64
KeepL0InMemory bool
MaxCacheSize int64
MaxBfCacheSize int64
LoadBloomsOnOpen bool

NumLevelZeroTables int
NumLevelZeroTablesStall int
Expand Down Expand Up @@ -130,6 +132,8 @@ func DefaultOptions(path string) Options {
VerifyValueChecksum: false,
Compression: options.None,
MaxCacheSize: 1 << 30, // 1 GB
MaxBfCacheSize: 0,
LoadBloomsOnOpen: true,
// The following benchmarks were done on a 4 KB block size (default block size). The
// compression is ratio supposed to increase with increasing compression level but since the
// input for compression algorithm is small (4 KB), we don't get significant benefit at
Expand Down Expand Up @@ -163,6 +167,7 @@ func buildTableOptions(opt Options) table.Options {
TableSize: uint64(opt.MaxTableSize),
BlockSize: opt.BlockSize,
BloomFalsePositive: opt.BloomFalsePositive,
LoadBloomsOnOpen: opt.LoadBloomsOnOpen,
LoadingMode: opt.TableLoadingMode,
ChkMode: opt.ChecksumVerificationMode,
Compression: opt.Compression,
Expand Down Expand Up @@ -593,3 +598,32 @@ func (opt Options) WithBypassLockGuard(b bool) Options {
opt.BypassLockGuard = b
return opt
}

// WithMaxBfCacheSize returns a new Options value with MaxBfCacheSize set to the given value.
//
// This value specifies how much memory should be used by the bloom filters.
// Badger uses bloom filters to speed up lookups. Each table has its own bloom
// filter and each bloom filter is approximately of 5 MB.
//
// Zero value for BfCacheSize means all the bloom filters will be kept in
// memory and the cache is disabled.
//
// The default value of MaxBfCacheSize is 0 which means all bloom filters will
// be kept in memory.
func (opt Options) WithMaxBfCacheSize(size int64) Options {
opt.MaxBfCacheSize = size
return opt
}

// WithLoadsBloomOnOpen returns a new Options value with LoadBloomsOnOpen set to the given value.
//
// Badger uses bloom filters to speed up key lookups. When LoadBloomsOnOpen is set
// to false, all bloom filters will be loaded on DB open. This is supposed to
// improve the read speed but it will affect the time taken to open the DB. Set
// this option to true to reduce the time taken to open the DB.
//
// The default value of LoadBloomsOnOpen is false.
func (opt Options) WithLoadBloomsOnOpen(b bool) Options {
opt.LoadBloomsOnOpen = b
return opt
}
1 change: 1 addition & 0 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error {
opts := buildTableOptions(w.db.opt)
opts.DataKey = builder.DataKey()
opts.Cache = w.db.blockCache
opts.BfCache = w.db.bfCache
var tbl *table.Table
if w.db.opt.InMemory {
var err error
Expand Down
40 changes: 22 additions & 18 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ type Options struct {
// Compression indicates the compression algorithm used for block compression.
Compression options.CompressionType

Cache *ristretto.Cache
Cache *ristretto.Cache
BfCache *ristretto.Cache

// ZSTDCompressionLevel is the ZSTD compression level used for compressing blocks.
ZSTDCompressionLevel int

// When LoadBloomsOnOpen is set, bloom filters will be read only when they are accessed.
// Otherwise they will be loaded on table open.
LoadBloomsOnOpen bool
}

// TableInterface is useful for testing.
Expand All @@ -93,7 +98,8 @@ type Table struct {
tableSize int // Initialized in OpenTable, using fd.Stat().

blockIndex []*pb.BlockOffset
ref int32 // For file garbage collection. Atomic.
ref int32 // For file garbage collection. Atomic.
bf *z.Bloom // Nil if BfCache is set.

mmap []byte // Memory mapped.

Expand Down Expand Up @@ -156,7 +162,7 @@ func (t *Table) DecrRef() error {
t.opt.Cache.Del(t.blockCacheKey(i))
}
// Delete bloom filter from the cache.
t.opt.Cache.Del(t.bfCacheKey())
t.opt.BfCache.Del(t.bfCacheKey())

}
return nil
Expand Down Expand Up @@ -374,16 +380,10 @@ func (t *Table) readIndex() error {
t.estimatedSize = index.EstimatedSize
t.blockIndex = index.Offsets

// Avoid the cost of unmarshalling the bloom filters if the cache is absent.
if t.opt.Cache != nil {
var bf *z.Bloom
if bf, err = z.JSONUnmarshal(index.BloomFilter); err != nil {
return y.Wrapf(err, "failed to unmarshal bloom filter for the table %d in Table.readIndex",
t.id)
}

t.opt.Cache.Set(t.bfCacheKey(), bf, int64(len(index.BloomFilter)))
if !t.opt.LoadBloomsOnOpen {
t.bf, _ = t.readBloomFilter()
}

return nil
}

Expand Down Expand Up @@ -509,20 +509,24 @@ func (t *Table) ID() uint64 { return t.id }
func (t *Table) DoesNotHave(hash uint64) bool {
var bf *z.Bloom

// Return fast if cache is absent.
if t.opt.Cache == nil {
bf, _ := t.readBloomFilter()
return !bf.Has(hash)
// Return fast if the cache is absent.
if t.opt.BfCache == nil {
// Load bloomfilter into memory if the cache is absent.
if t.bf == nil {
y.AssertTrue(t.opt.LoadBloomsOnOpen)
t.bf, _ = t.readBloomFilter()
}
return !t.bf.Has(hash)
}

// Check if the bloomfilter exists in the cache.
if b, ok := t.opt.Cache.Get(t.bfCacheKey()); b != nil && ok {
if b, ok := t.opt.BfCache.Get(t.bfCacheKey()); b != nil && ok {
bf = b.(*z.Bloom)
return !bf.Has(hash)
}

bf, sz := t.readBloomFilter()
t.opt.Cache.Set(t.bfCacheKey(), bf, int64(sz))
t.opt.BfCache.Set(t.bfCacheKey(), bf, int64(sz))
return !bf.Has(hash)
}

Expand Down