Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes eventLog with Logger #1203

Merged
merged 10 commits into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 16 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/dgraph-io/ristretto"
humanize "github.com/dustin/go-humanize"
"github.com/pkg/errors"
"golang.org/x/net/trace"
)

var (
Expand Down Expand Up @@ -68,7 +67,6 @@ type DB struct {
valueDirGuard *directoryLockGuard

closers closers
elog trace.EventLog
mt *skl.Skiplist // Our latest (actively written) in-memory table
imm []*skl.Skiplist // Add here only AFTER pushing to flushChan.
opt Options
Expand Down Expand Up @@ -109,7 +107,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {

toLSM := func(nk []byte, vs y.ValueStruct) {
for err := db.ensureRoomForWrite(); err != nil; err = db.ensureRoomForWrite() {
db.elog.Printf("Replay: Making room for writes")
db.opt.Debugf("Replay: Making room for writes")
time.Sleep(10 * time.Millisecond)
}
db.mt.Put(nk, vs)
Expand All @@ -118,7 +116,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
first := true
return func(e Entry, vp valuePointer) error { // Function for replaying.
if first {
db.elog.Printf("First key=%q\n", e.Key)
db.opt.Debugf("First key=%q\n", e.Key)
}
first = false
db.orc.Lock()
Expand Down Expand Up @@ -273,18 +271,12 @@ func Open(opt Options) (db *DB, err error) {
}
}()

elog := y.NoEventLog
if opt.EventLogging {
elog = trace.NewEventLog("Badger", "DB")
}

db = &DB{
imm: make([]*skl.Skiplist, 0, opt.NumMemtables),
flushChan: make(chan flushTask, opt.NumMemtables),
writeCh: make(chan *request, kvWriteChCapacity),
opt: opt,
manifest: manifestFile,
elog: elog,
dirLockGuard: dirLockGuard,
valueDirGuard: valueDirLockGuard,
orc: newOracle(opt),
Expand Down Expand Up @@ -405,7 +397,7 @@ func (db *DB) Close() error {
}

func (db *DB) close() (err error) {
db.elog.Printf("Closing database")
db.opt.Debugf("Closing database")

atomic.StoreInt32(&db.blockWrites, 1)

Expand Down Expand Up @@ -433,7 +425,7 @@ func (db *DB) close() (err error) {
// trying to push stuff into the memtable. This will also resolve the value
// offset problem: as we push into memtable, we update value offsets there.
if !db.mt.Empty() {
db.elog.Printf("Flushing memtable")
db.opt.Debugf("Flushing memtable")
for {
pushedFlushTask := func() bool {
db.Lock()
Expand All @@ -443,7 +435,7 @@ func (db *DB) close() (err error) {
case db.flushChan <- flushTask{mt: db.mt, vptr: db.vhead}:
db.imm = append(db.imm, db.mt) // Flusher will attempt to remove this from s.imm.
db.mt = nil // Will segfault if we try writing!
db.elog.Printf("pushed to flush chan\n")
db.opt.Debugf("pushed to flush chan\n")
return true
default:
// If we fail to push, we need to unlock and wait for a short while.
Expand Down Expand Up @@ -479,12 +471,11 @@ func (db *DB) close() (err error) {
if lcErr := db.lc.close(); err == nil {
err = errors.Wrap(lcErr, "DB.Close")
}
db.elog.Printf("Waiting for closer")
db.opt.Debugf("Waiting for closer")
db.closers.updateSize.SignalAndWait()
db.orc.Stop()
db.blockCache.Close()

db.elog.Finish()
if db.opt.InMemory {
return
}
Expand Down Expand Up @@ -682,16 +673,16 @@ func (db *DB) writeRequests(reqs []*request) error {
r.Wg.Done()
}
}
db.elog.Printf("writeRequests called. Writing to value log")
db.opt.Debugf("writeRequests called. Writing to value log")
err := db.vlog.write(reqs)
if err != nil {
done(err)
return err
}

db.elog.Printf("Sending updates to subscribers")
db.opt.Debugf("Sending updates to subscribers")
db.pub.sendUpdates(reqs)
db.elog.Printf("Writing to memtable")
db.opt.Debugf("Writing to memtable")
var count int
for _, b := range reqs {
if len(b.Entries) == 0 {
Expand All @@ -702,7 +693,7 @@ func (db *DB) writeRequests(reqs []*request) error {
for err = db.ensureRoomForWrite(); err == errNoRoom; err = db.ensureRoomForWrite() {
i++
if i%100 == 0 {
db.elog.Printf("Making room for writes")
db.opt.Debugf("Making room for writes")
}
// We need to poll a bit because both hasRoomForWrite and the flusher need access to s.imm.
// When flushChan is full and you are blocked there, and the flusher is trying to update s.imm,
Expand All @@ -720,7 +711,7 @@ func (db *DB) writeRequests(reqs []*request) error {
db.updateHead(b.Ptrs)
}
done(nil)
db.elog.Printf("%d entries written", count)
db.opt.Debugf("%d entries written", count)
return nil
}

Expand Down Expand Up @@ -934,7 +925,7 @@ func (db *DB) handleFlushTask(ft flushTask) error {

// Store badger head even if vptr is zero, need it for readTs
db.opt.Debugf("Storing value log head: %+v\n", ft.vptr)
db.elog.Printf("Storing offset: %+v\n", ft.vptr)
db.opt.Debugf("Storing offset: %+v\n", ft.vptr)
val := ft.vptr.Encode()

// Pick the max commit ts, so in case of crash, our read ts would be higher than all the
Expand Down Expand Up @@ -971,17 +962,17 @@ func (db *DB) handleFlushTask(ft flushTask) error {
go func() { dirSyncCh <- db.syncDir(db.opt.Dir) }()

if _, err = fd.Write(tableData); err != nil {
db.elog.Errorf("ERROR while writing to level 0: %v", err)
db.opt.Errorf("ERROR while writing to level 0: %v", err)
return err
}

if dirSyncErr := <-dirSyncCh; dirSyncErr != nil {
// Do dir sync as best effort. No need to return due to an error there.
db.elog.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
db.opt.Errorf("ERROR while syncing level directory: %v", dirSyncErr)
}
tbl, err := table.OpenTable(fd, bopts)
if err != nil {
db.elog.Printf("ERROR while opening table: %v", err)
db.opt.Debugf("ERROR while opening table: %v", err)
return err
}
// We own a ref on tbl.
Expand Down Expand Up @@ -1063,7 +1054,7 @@ func (db *DB) calculateSize() {
return nil
})
if err != nil {
db.elog.Printf("Got error while calculating total size of directory: %s", dir)
db.opt.Debugf("Got error while calculating total size of directory: %s", dir)
}
return lsmSize, vlogSize
}
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger v1.6.0 h1:DshxFxZWXUcO0xX476VJC07Xsr6ZCBVRHKZ93Oh7Evo=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3 h1:MQLRM35Pp0yAyBYksjbj1nZI/w6eyRY/mWoM1sFf4kU=
github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
Expand Down
12 changes: 5 additions & 7 deletions levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (

type levelsController struct {
nextFileID uint64 // Atomic
elog trace.EventLog

// The following are initialized once and const.
levels []*levelHandler
Expand All @@ -62,7 +61,7 @@ func revertToManifest(kv *DB, mf *Manifest, idMap map[uint64]struct{}) error {
// 2. Delete files that shouldn't exist.
for id := range idMap {
if _, ok := mf.Tables[id]; !ok {
kv.elog.Printf("Table file %d not referenced in MANIFEST\n", id)
kv.opt.Debugf("Table file %d not referenced in MANIFEST\n", id)
filename := table.NewFilename(id, kv.opt.Dir)
if err := os.Remove(filename); err != nil {
return y.Wrapf(err, "While removing table %d", id)
Expand All @@ -77,7 +76,6 @@ func newLevelsController(db *DB, mf *Manifest) (*levelsController, error) {
y.AssertTrue(db.opt.NumLevelZeroTablesStall > db.opt.NumLevelZeroTables)
s := &levelsController{
kv: db,
elog: db.elog,
levels: make([]*levelHandler, db.opt.MaxLevels),
}
s.cstatus.levels = make([]*levelCompactStatus, db.opt.MaxLevels)
Expand Down Expand Up @@ -926,10 +924,10 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
// Stall. Make sure all levels are healthy before we unstall.
var timeStart time.Time
{
s.elog.Printf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
s.cstatus.RLock()
for i := 0; i < s.kv.opt.MaxLevels; i++ {
s.elog.Printf("level=%d. Status=%s Size=%d\n",
s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n",
i, s.cstatus.levels[i].debug(), s.levels[i].getTotalSize())
}
s.cstatus.RUnlock()
Expand All @@ -947,12 +945,12 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
time.Sleep(10 * time.Millisecond)
if i%100 == 0 {
prios := s.pickCompactLevels()
s.elog.Printf("Waiting to add level 0 table. Compaction priorities: %+v\n", prios)
s.kv.opt.Debugf("Waiting to add level 0 table. Compaction priorities: %+v\n", prios)
i = 0
}
}
{
s.elog.Printf("UNSTALLED UNSTALLED UNSTALLED: %v\n", time.Since(timeStart))
s.kv.opt.Debugf("UNSTALLED UNSTALLED UNSTALLED: %v\n", time.Since(timeStart))
s.lastUnstalled = time.Now()
}
}
Expand Down
1 change: 0 additions & 1 deletion levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,6 @@ func TestL0Stall(t *testing.T) {
}

opt := DefaultOptions("")
opt.EventLogging = false
// Disable all compactions.
opt.NumCompactors = 0
// Number of level zero tables.
Expand Down
4 changes: 4 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ type defaultLog struct {

var defaultLogger = &defaultLog{Logger: log.New(os.Stderr, "badger ", log.LstdFlags)}

func DefaultLogger() *defaultLog {
return defaultLogger
}

func (l *defaultLog) Errorf(f string, v ...interface{}) {
l.Printf("ERROR: "+f, v...)
}
Expand Down
12 changes: 0 additions & 12 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type Options struct {
Truncate bool
Logger Logger
Compression options.CompressionType
EventLogging bool
InMemory bool

// Fine tuning options.
Expand Down Expand Up @@ -147,7 +146,6 @@ func DefaultOptions(path string) Options {
Truncate: false,
Logger: defaultLogger,
LogRotatesToFlush: 2,
EventLogging: true,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
}
Expand Down Expand Up @@ -283,16 +281,6 @@ func (opt Options) WithLogger(val Logger) Options {
return opt
}

// WithEventLogging returns a new Options value with EventLogging set to the given value.
//
// EventLogging provides a way to enable or disable trace.EventLog logging.
//
// The default value of EventLogging is true.
func (opt Options) WithEventLogging(enabled bool) Options {
opt.EventLogging = enabled
return opt
}

// WithMaxTableSize returns a new Options value with MaxTableSize set to the given value.
//
// MaxTableSize sets the maximum size in bytes for each LSM table or file.
Expand Down
4 changes: 2 additions & 2 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func newOracle(opt Options) *oracle {
txnMark: &y.WaterMark{Name: "badger.TxnTimestamp"},
closer: y.NewCloser(2),
}
orc.readMark.Init(orc.closer, opt.EventLogging)
orc.txnMark.Init(orc.closer, opt.EventLogging)
orc.readMark.Init(orc.closer, opt)
orc.txnMark.Init(orc.closer, opt)
return orc
}

Expand Down
15 changes: 5 additions & 10 deletions value.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ type lfDiscardStats struct {

type valueLog struct {
dirPath string
elog trace.EventLog

// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
Expand Down Expand Up @@ -1064,10 +1063,7 @@ func (vlog *valueLog) open(db *DB, ptr valuePointer, replayFn logEntry) error {
return nil
}
vlog.dirPath = vlog.opt.ValueDir
vlog.elog = y.NoEventLog
if vlog.opt.EventLogging {
vlog.elog = trace.NewEventLog("Badger", "Valuelog")
}

vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
vlog.lfDiscardStats = &lfDiscardStats{
m: make(map[uint32]int64),
Expand Down Expand Up @@ -1212,8 +1208,7 @@ func (vlog *valueLog) Close() error {
// close flushDiscardStats.
vlog.lfDiscardStats.closer.SignalAndWait()

vlog.elog.Printf("Stopping garbage collection of values.")
defer vlog.elog.Finish()
vlog.opt.Debugf("Stopping garbage collection of values.")

var err error
for id, f := range vlog.filesMap {
Expand Down Expand Up @@ -1362,15 +1357,15 @@ func (vlog *valueLog) write(reqs []*request) error {
if buf.Len() == 0 {
return nil
}
vlog.elog.Printf("Flushing buffer of size %d to vlog", buf.Len())
vlog.opt.Debugf("Flushing buffer of size %d to vlog", buf.Len())
n, err := curlf.fd.Write(buf.Bytes())
if err != nil {
return errors.Wrapf(err, "Unable to write to value log file: %q", curlf.path)
}
buf.Reset()
y.NumWrites.Add(1)
y.NumBytesWritten.Add(int64(n))
vlog.elog.Printf("Done")
vlog.opt.Debugf("Done")
atomic.AddUint32(&vlog.writableLogOffset, uint32(n))
atomic.StoreUint32(&curlf.size, vlog.writableLogOffset)
return nil
Expand Down Expand Up @@ -1693,7 +1688,7 @@ func (vlog *valueLog) doRunGC(lf *logFile, discardRatio float64, tr trace.Trace)
// This is still the active entry. This would need to be rewritten.

} else {
vlog.elog.Printf("Reason=%+v\n", r)
vlog.opt.Debugf("Reason=%+v\n", r)
buf, lf, err := vlog.readValueBytes(vp, s)
// we need to decide, whether to unlock the lock file immediately based on the
// loading mode. getUnlockCallback will take care of it.
Expand Down
16 changes: 6 additions & 10 deletions y/watermark.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"context"
"sync/atomic"

"golang.org/x/net/trace"
badger "github.com/dgraph-io/badger/v2"
)

type uint64Heap []uint64
Expand Down Expand Up @@ -64,17 +64,13 @@ type WaterMark struct {
lastIndex uint64
Name string
markCh chan mark
elog trace.EventLog
logger badger.Logger
}

// Init initializes a WaterMark struct. MUST be called before using it.
func (w *WaterMark) Init(closer *Closer, eventLogging bool) {
func (w *WaterMark) Init(closer *Closer, opt badger.Options) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of passing a boolean value I am passing the Options object so that logger can be initialized for the Watermark struct as well.
@jarifibrahim, please see if this is the correct way to do it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may cause the circular import error. I'm not clear about how would I do it otherwise. will creating the logger as a different module be a better approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move the logger inside the y package. The y package contains a bunch of utility types.

w.markCh = make(chan mark, 100)
if eventLogging {
w.elog = trace.NewEventLog("Watermark", w.Name)
} else {
w.elog = NoEventLog
}
w.logger = opt.Logger
go w.process(closer)
}

Expand Down Expand Up @@ -168,7 +164,7 @@ func (w *WaterMark) process(closer *Closer) {
loop++
if len(indices) > 0 && loop%10000 == 0 {
min := indices[0]
w.elog.Printf("WaterMark %s: Done entry %4d. Size: %4d Watermark: %-4d Looking for: "+
w.logger.Debugf("WaterMark %s: Done entry %4d. Size: %4d Watermark: %-4d Looking for: "+
"%-4d. Value: %d\n", w.Name, index, len(indices), w.DoneUntil(), min, pending[min])
}

Expand Down Expand Up @@ -197,7 +193,7 @@ func (w *WaterMark) process(closer *Closer) {

if until != doneUntil {
AssertTrue(atomic.CompareAndSwapUint64(&w.doneUntil, doneUntil, until))
w.elog.Printf("%s: Done until %d. Loops: %d\n", w.Name, until, loops)
w.logger.Debugf("%s: Done until %d. Loops: %d\n", w.Name, until, loops)
}

notifyAndRemove := func(idx uint64, toNotify []chan struct{}) {
Expand Down