Skip to content

Commit

Permalink
Tests: Do not leave behind state goroutines (#1349)
Browse files Browse the repository at this point in the history
A bunch of tests would start goroutines and never stop them. This PR fixes
that. There should not be any goroutine left after all the tests are
completed.

This PR also adds a db.cleanup method which would be called in case badger.Open
call fails. The cleanup method will stop all the goroutines started by Open
call.
  • Loading branch information
Ibrahim Jarif committed Oct 2, 2020
1 parent 6da7bbf commit 9ae565e
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 20 deletions.
40 changes: 36 additions & 4 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,13 @@ func Open(opt Options) (db *DB, err error) {
orc: newOracle(opt),
pub: newPublisher(),
}
// Cleanup all the goroutines started by badger in case of an error.
defer func() {
if err != nil {
db.cleanup()
db = nil
}
}()

if opt.MaxCacheSize > 0 {
config := ristretto.Config{
Expand Down Expand Up @@ -322,7 +329,6 @@ func Open(opt Options) (db *DB, err error) {
return nil, errors.Wrap(err, "failed to create bf cache")
}
}

if db.opt.InMemory {
db.opt.SyncWrites = false
// If badger is running in memory mode, push everything into the LSM Tree.
Expand All @@ -337,7 +343,7 @@ func Open(opt Options) (db *DB, err error) {
}

if db.registry, err = OpenKeyRegistry(krOpt); err != nil {
return nil, err
return db, err
}
db.calculateSize()
db.closers.updateSize = y.NewCloser(1)
Expand All @@ -346,7 +352,7 @@ func Open(opt Options) (db *DB, err error) {

// newLevelsController potentially loads files in directory.
if db.lc, err = newLevelsController(db, &manifest); err != nil {
return nil, err
return db, err
}

// Initialize vlog struct.
Expand All @@ -366,7 +372,7 @@ func Open(opt Options) (db *DB, err error) {
// Need to pass with timestamp, lsm get removes the last 8 bytes and compares key
vs, err := db.get(headKey)
if err != nil {
return nil, errors.Wrap(err, "Retrieving head")
return db, errors.Wrap(err, "Retrieving head")
}
db.orc.nextTxnTs = vs.Version
var vptr valuePointer
Expand All @@ -378,6 +384,7 @@ func Open(opt Options) (db *DB, err error) {
go db.doWrites(replayCloser)

if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil {
replayCloser.SignalAndWait()
return db, y.Wrapf(err, "During db.vlog.open")
}
replayCloser.SignalAndWait() // Wait for replay to be applied first.
Expand Down Expand Up @@ -408,6 +415,31 @@ func Open(opt Options) (db *DB, err error) {
return db, nil
}

// cleanup stops all the goroutines started by badger. This is used in open to
// cleanup goroutines in case of an error.
func (db *DB) cleanup() {
db.blockCache.Close()
db.bfCache.Close()
db.stopMemoryFlush()
db.stopCompactions()

if db.closers.updateSize != nil {
db.closers.updateSize.Signal()
}
if db.closers.valueGC != nil {
db.closers.valueGC.Signal()
}
if db.closers.writes != nil {
db.closers.writes.Signal()
}
if db.closers.pub != nil {
db.closers.pub.Signal()
}

db.orc.Stop()
db.vlog.Close()
}

// DataCacheMetrics returns the metrics for the underlying data cache.
func (db *DB) DataCacheMetrics() *ristretto.Metrics {
if db.blockCache != nil {
Expand Down
14 changes: 6 additions & 8 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,9 @@ func TestExpiry(t *testing.T) {
func TestExpiryImproperDBClose(t *testing.T) {
testReplay := func(opt Options) {

db0, err := Open(opt)
// L0 compaction doesn't affect the test in any way. It is set to allow
// graceful shutdown of db0.
db0, err := Open(opt.WithCompactL0OnClose(false))
require.NoError(t, err)

dur := 1 * time.Hour
Expand All @@ -1280,17 +1282,13 @@ func TestExpiryImproperDBClose(t *testing.T) {
// Simulate a crash by not closing db0, but releasing the locks.
if db0.dirLockGuard != nil {
require.NoError(t, db0.dirLockGuard.release())
db0.dirLockGuard = nil
}
if db0.valueDirGuard != nil {
require.NoError(t, db0.valueDirGuard.release())
db0.valueDirGuard = nil
}
// We need to close vlog to fix the vlog file size. On windows, the vlog file
// is truncated to 2*MaxVlogSize and if we don't close the vlog file, reopening
// it would return Truncate Required Error.
require.NoError(t, db0.vlog.Close())

require.NoError(t, db0.registry.Close())
require.NoError(t, db0.manifest.close())
require.NoError(t, db0.Close())

db1, err := Open(opt)
require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions levels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,15 @@ func TestL0Stall(t *testing.T) {
t.Log("Timeout triggered")
// Mark this test as successful since L0 is in memory and the
// addition of new table to L0 is supposed to stall.

// Remove tables from level 0 so that the stalled
// compaction can make progress. This does not have any
// effect on the test. This is done so that the goroutine
// stuck on addLevel0Table can make progress and end.
db.lc.levels[0].Lock()
db.lc.levels[0].tables = nil
db.lc.levels[0].Unlock()
<-done
} else {
t.Fatal("Test didn't finish in time")
}
Expand Down
10 changes: 6 additions & 4 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func TestDropAllTwice(t *testing.T) {

// Call DropAll again.
require.NoError(t, db.DropAll())
require.NoError(t, db.Close())
}
t.Run("disk mode", func(t *testing.T) {
dir, err := ioutil.TempDir("", "badger-test")
Expand All @@ -175,7 +176,6 @@ func TestDropAllTwice(t *testing.T) {
opts := getTestOptions("")
opts.InMemory = true
test(t, opts)

})
}

Expand Down Expand Up @@ -278,8 +278,9 @@ func TestDropReadOnly(t *testing.T) {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { db2.DropAll() })
require.NoError(t, db2.Close())
}
require.Panics(t, func() { db2.DropAll() })
}

func TestWriteAfterClose(t *testing.T) {
Expand Down Expand Up @@ -523,8 +524,9 @@ func TestDropPrefixReadOnly(t *testing.T) {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { db2.DropPrefix([]byte("key0")) })
require.NoError(t, db2.Close())
}
require.Panics(t, func() { db2.DropPrefix([]byte("key0")) })
}

func TestDropPrefixRace(t *testing.T) {
Expand Down Expand Up @@ -590,7 +592,7 @@ func TestDropPrefixRace(t *testing.T) {
after := numKeysManaged(db, math.MaxUint64)
t.Logf("Before: %d. After dropprefix: %d\n", before, after)
require.True(t, after < before)
db.Close()
require.NoError(t, db.Close())
}

func TestWriteBatchManagedMode(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion value.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ func (lf *logFile) init() error {
}

func (vlog *valueLog) Close() error {
if vlog.db.opt.InMemory {
if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
return nil
}
// close flushDiscardStats.
Expand Down
6 changes: 3 additions & 3 deletions value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ func TestPenultimateLogCorruption(t *testing.T) {

db0, err := Open(opt)
require.NoError(t, err)
defer func() { require.NoError(t, db0.Close()) }()

h := testHelper{db: db0, t: t}
h.writeRange(0, 7)
Expand All @@ -780,13 +781,12 @@ func TestPenultimateLogCorruption(t *testing.T) {
// Simulate a crash by not closing db0, but releasing the locks.
if db0.dirLockGuard != nil {
require.NoError(t, db0.dirLockGuard.release())
db0.dirLockGuard = nil
}
if db0.valueDirGuard != nil {
require.NoError(t, db0.valueDirGuard.release())
db0.valueDirGuard = nil
}
require.NoError(t, db0.vlog.Close())
require.NoError(t, db0.manifest.close())
require.NoError(t, db0.registry.Close())

opt.Truncate = true
db1, err := Open(opt)
Expand Down

0 comments on commit 9ae565e

Please sign in to comment.