Skip to content

Commit

Permalink
Enable garbage collection of Badger DBs
Browse files Browse the repository at this point in the history
Co-authored-by: Alvaro Alda <[email protected]>
Co-authored-by: pancho horrilo <[email protected]>
  • Loading branch information
3 people committed Nov 21, 2018
1 parent cc0e759 commit 9e9dd6a
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 19 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/bbva/qed

require (
github.com/bbva/raft-badger v0.1.0
github.com/bbva/raft-badger v0.1.1
github.com/boltdb/bolt v1.3.1 // indirect
github.com/coreos/bbolt v1.3.0
github.com/dgraph-io/badger v1.5.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/bbva/raft-badger v0.0.0-20180802070646-9b665fa3e1ff h1:bkOdo7w7F6eieF
github.com/bbva/raft-badger v0.0.0-20180802070646-9b665fa3e1ff/go.mod h1:hy/EU1/708ip3lfr52RzJbSRc6RCtrNpnuTDrIMolr4=
github.com/bbva/raft-badger v0.1.0 h1:2Rs2cE84pYaqE4IqI6ZZr/wXUvetZ8jV4doOH0Yaw8k=
github.com/bbva/raft-badger v0.1.0/go.mod h1:/XINu34Us6PULTVQn0D6I/WtniWHHjut9lnsfFVECDA=
github.com/bbva/raft-badger v0.1.1 h1:v0BlEP2glTd3o1U4ShD4HtyGc08PXt4gcEBdO6Fh7Oc=
github.com/bbva/raft-badger v0.1.1/go.mod h1:KqKb1IrW6hsgFSzXTavxddJH+5E3TmDxBbFhitk0vpY=
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/coreos/bbolt v1.3.0 h1:HIgH5xUWXT914HCI671AxuTTqjj64UOFr7pHn48LUTI=
Expand Down
4 changes: 2 additions & 2 deletions raftwal/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type RaftBalloon struct {
func NewRaftBalloon(path, addr, id string, store storage.ManagedStore) (*RaftBalloon, error) {

// Create the log store and stable store
badgerLogStore, err := raftbadger.New(raftbadger.Options{Path: path + "/logs", NoSync: true}) // raftbadger.NewBadgerStore(path + "/logs")
badgerLogStore, err := raftbadger.New(raftbadger.Options{Path: path + "/logs", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/logs")
if err != nil {
return nil, fmt.Errorf("new badger store: %s", err)
}
Expand All @@ -102,7 +102,7 @@ func NewRaftBalloon(path, addr, id string, store storage.ManagedStore) (*RaftBal
return nil, fmt.Errorf("new cached store: %s", err)
}

stableStore, err := raftbadger.New(raftbadger.Options{Path: path + "/config", NoSync: true}) // raftbadger.NewBadgerStore(path + "/config")
stableStore, err := raftbadger.New(raftbadger.Options{Path: path + "/config", NoSync: true, ValueLogGC: true}) // raftbadger.NewBadgerStore(path + "/config")
if err != nil {
return nil, fmt.Errorf("new badger store: %s", err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewServer(
}

// Open badger store
store, err := badger.NewBadgerStore(dbPath)
store, err := badger.NewBadgerStoreOpts(&badger.Options{Path: dbPath, ValueLogGC: true})
if err != nil {
return nil, err
}
Expand Down
127 changes: 113 additions & 14 deletions storage/badger/badger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,101 @@ import (
"bytes"
"encoding/binary"
"io"
"log"

"time"

b "github.com/dgraph-io/badger"
bo "github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/protos"
"github.com/dgraph-io/badger/y"

"github.com/bbva/qed/log"
"github.com/bbva/qed/storage"
)

type BadgerStore struct {
db *b.DB
db *b.DB
vlogTicker *time.Ticker // runs every 1m, check size of vlog and run GC conditionally.
mandatoryVlogTicker *time.Ticker // runs every 10m, we always run vlog GC.
}

// Options contains all the configuration used to open the Badger db
type Options struct {
// Path is the directory path to the Badger db to use.
Path string

// BadgerOptions contains any specific Badger options you might
// want to specify.
BadgerOptions *b.Options

// NoSync causes the database to skip fsync calls after each
// write to the log. This is unsafe, so it should be used
// with caution.
NoSync bool

// ValueLogGC enables a periodic goroutine that does a garbage
// collection of the value log while the underlying Badger is online.
ValueLogGC bool

// GCInterval is the interval between conditionally running the garbage
// collection process, based on the size of the vlog. By default, runs every 1m.
GCInterval time.Duration

// GCInterval is the interval between mandatory running the garbage
// collection process. By default, runs every 10m.
MandatoryGCInterval time.Duration

// GCThreshold sets threshold in bytes for the vlog size to be included in the
// garbage collection cycle. By default, 1GB.
GCThreshold int64
}

func NewBadgerStore(path string) (*BadgerStore, error) {
opts := b.DefaultOptions
opts.TableLoadingMode = bo.MemoryMap
opts.ValueLogLoadingMode = bo.FileIO
opts.Dir = path
opts.ValueDir = path
opts.SyncWrites = false

return NewBadgerStoreOpts(path, opts)
return NewBadgerStoreOpts(&Options{Path: path})
}

func NewBadgerStoreOpts(path string, opts b.Options) (*BadgerStore, error) {
db, err := b.Open(opts)
func NewBadgerStoreOpts(opts *Options) (*BadgerStore, error) {

var bOpts b.Options
if bOpts = b.DefaultOptions; opts.BadgerOptions != nil {
bOpts = *opts.BadgerOptions
}

bOpts.TableLoadingMode = bo.MemoryMap
bOpts.ValueLogLoadingMode = bo.FileIO
bOpts.Dir = opts.Path
bOpts.ValueDir = opts.Path
bOpts.SyncWrites = false

db, err := b.Open(bOpts)
if err != nil {
return nil, err
}
return &BadgerStore{db}, nil

store := &BadgerStore{db: db}
// Start GC routine
if opts.ValueLogGC {

var gcInterval time.Duration
var mandatoryGCInterval time.Duration
var threshold int64

if gcInterval = 1 * time.Minute; opts.GCInterval != 0 {
gcInterval = opts.GCInterval
}
if mandatoryGCInterval = 10 * time.Minute; opts.MandatoryGCInterval != 0 {
mandatoryGCInterval = opts.MandatoryGCInterval
}
if threshold = int64(1 << 30); opts.GCThreshold != 0 {
threshold = opts.GCThreshold
}

store.vlogTicker = time.NewTicker(gcInterval)
store.mandatoryVlogTicker = time.NewTicker(mandatoryGCInterval)
go store.runVlogGC(db, threshold)
}

return store, nil
}

func (s BadgerStore) Mutate(mutations []*storage.Mutation) error {
Expand Down Expand Up @@ -196,6 +260,12 @@ func (s BadgerStore) GetAll(prefix byte) storage.KVPairReader {
}

func (s BadgerStore) Close() error {
if s.vlogTicker != nil {
s.vlogTicker.Stop()
}
if s.mandatoryVlogTicker != nil {
s.mandatoryVlogTicker.Stop()
}
return s.db.Close()
}

Expand Down Expand Up @@ -237,7 +307,7 @@ func (s *BadgerStore) Backup(w io.Writer, until uint64) error {
}
val, err := item.Value()
if err != nil {
log.Printf("Key [%x]. Error while fetching value [%v]\n", item.Key(), err)
log.Infof("Key [%x]. Error while fetching value [%v]\n", item.Key(), err)
continue
}

Expand Down Expand Up @@ -282,3 +352,32 @@ func (s *BadgerStore) GetLastVersion() (uint64, error) {
})
return version, err
}

func (b *BadgerStore) runVlogGC(db *b.DB, threshold int64) {
// Get initial size on start.
_, lastVlogSize := db.Size()

runGC := func() {
var err error
for err == nil {
// If a GC is successful, immediately run it again.
log.Debug("VlogGC task: running...")
err = db.RunValueLogGC(0.7)
}
log.Debug("VlogGC task: done.")
_, lastVlogSize = db.Size()
}

for {
select {
case <-b.vlogTicker.C:
_, currentVlogSize := db.Size()
if currentVlogSize < lastVlogSize+threshold {
continue
}
runGC()
case <-b.mandatoryVlogTicker.C:
runGC()
}
}
}
4 changes: 3 additions & 1 deletion tests/memory/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
)

func newBadgerTest(path string) (*badger.BadgerStore, error) {

opts := b.DefaultOptions
opts.Dir = path
opts.ValueDir = path
Expand All @@ -52,7 +53,8 @@ func newBadgerTest(path string) (*badger.BadgerStore, error) {

opts.SyncWrites = false

return badger.NewBadgerStoreOpts(path, opts)
storeOptions := &badger.Options{Path: path, BadgerOptions: &opts}
return badger.NewBadgerStoreOpts(storeOptions)
}

func cleanup(db *b.DB) {
Expand Down

0 comments on commit 9e9dd6a

Please sign in to comment.