diff --git a/db.go b/db.go index 79b332d23..fd3e85ace 100644 --- a/db.go +++ b/db.go @@ -291,6 +291,13 @@ func Open(opt Options) (db *DB, err error) { orc: newOracle(opt), pub: newPublisher(), } + // Cleanup all the goroutines started by badger in case of an error. + defer func() { + if err != nil { + db.cleanup() + db = nil + } + }() if opt.MaxCacheSize > 0 { config := ristretto.Config{ @@ -322,7 +329,6 @@ func Open(opt Options) (db *DB, err error) { return nil, errors.Wrap(err, "failed to create bf cache") } } - if db.opt.InMemory { db.opt.SyncWrites = false // If badger is running in memory mode, push everything into the LSM Tree. @@ -337,7 +343,7 @@ func Open(opt Options) (db *DB, err error) { } if db.registry, err = OpenKeyRegistry(krOpt); err != nil { - return nil, err + return db, err } db.calculateSize() db.closers.updateSize = y.NewCloser(1) @@ -346,7 +352,7 @@ func Open(opt Options) (db *DB, err error) { // newLevelsController potentially loads files in directory. if db.lc, err = newLevelsController(db, &manifest); err != nil { - return nil, err + return db, err } // Initialize vlog struct. @@ -366,7 +372,7 @@ func Open(opt Options) (db *DB, err error) { // Need to pass with timestamp, lsm get removes the last 8 bytes and compares key vs, err := db.get(headKey) if err != nil { - return nil, errors.Wrap(err, "Retrieving head") + return db, errors.Wrap(err, "Retrieving head") } db.orc.nextTxnTs = vs.Version var vptr valuePointer @@ -378,6 +384,7 @@ func Open(opt Options) (db *DB, err error) { go db.doWrites(replayCloser) if err = db.vlog.open(db, vptr, db.replayFunction()); err != nil { + replayCloser.SignalAndWait() return db, y.Wrapf(err, "During db.vlog.open") } replayCloser.SignalAndWait() // Wait for replay to be applied first. @@ -408,6 +415,31 @@ func Open(opt Options) (db *DB, err error) { return db, nil } +// cleanup stops all the goroutines started by badger. This is used in open to +// cleanup goroutines in case of an error. +func (db *DB) cleanup() { + db.blockCache.Close() + db.bfCache.Close() + db.stopMemoryFlush() + db.stopCompactions() + + if db.closers.updateSize != nil { + db.closers.updateSize.Signal() + } + if db.closers.valueGC != nil { + db.closers.valueGC.Signal() + } + if db.closers.writes != nil { + db.closers.writes.Signal() + } + if db.closers.pub != nil { + db.closers.pub.Signal() + } + + db.orc.Stop() + db.vlog.Close() +} + // DataCacheMetrics returns the metrics for the underlying data cache. func (db *DB) DataCacheMetrics() *ristretto.Metrics { if db.blockCache != nil { diff --git a/db_test.go b/db_test.go index 95ccdf30f..af80a2d5e 100644 --- a/db_test.go +++ b/db_test.go @@ -1265,7 +1265,9 @@ func TestExpiry(t *testing.T) { func TestExpiryImproperDBClose(t *testing.T) { testReplay := func(opt Options) { - db0, err := Open(opt) + // L0 compaction doesn't affect the test in any way. It is set to allow + // graceful shutdown of db0. + db0, err := Open(opt.WithCompactL0OnClose(false)) require.NoError(t, err) dur := 1 * time.Hour @@ -1280,17 +1282,13 @@ func TestExpiryImproperDBClose(t *testing.T) { // Simulate a crash by not closing db0, but releasing the locks. if db0.dirLockGuard != nil { require.NoError(t, db0.dirLockGuard.release()) + db0.dirLockGuard = nil } if db0.valueDirGuard != nil { require.NoError(t, db0.valueDirGuard.release()) + db0.valueDirGuard = nil } - // We need to close vlog to fix the vlog file size. On windows, the vlog file - // is truncated to 2*MaxVlogSize and if we don't close the vlog file, reopening - // it would return Truncate Required Error. - require.NoError(t, db0.vlog.Close()) - - require.NoError(t, db0.registry.Close()) - require.NoError(t, db0.manifest.close()) + require.NoError(t, db0.Close()) db1, err := Open(opt) require.NoError(t, err) diff --git a/levels_test.go b/levels_test.go index 8c7df15bb..aa83be8af 100644 --- a/levels_test.go +++ b/levels_test.go @@ -538,6 +538,15 @@ func TestL0Stall(t *testing.T) { t.Log("Timeout triggered") // Mark this test as successful since L0 is in memory and the // addition of new table to L0 is supposed to stall. + + // Remove tables from level 0 so that the stalled + // compaction can make progress. This does not have any + // effect on the test. This is done so that the goroutine + // stuck on addLevel0Table can make progress and end. + db.lc.levels[0].Lock() + db.lc.levels[0].tables = nil + db.lc.levels[0].Unlock() + <-done } else { t.Fatal("Test didn't finish in time") } diff --git a/managed_db_test.go b/managed_db_test.go index 932744bf1..959000657 100644 --- a/managed_db_test.go +++ b/managed_db_test.go @@ -162,6 +162,7 @@ func TestDropAllTwice(t *testing.T) { // Call DropAll again. require.NoError(t, db.DropAll()) + require.NoError(t, db.Close()) } t.Run("disk mode", func(t *testing.T) { dir, err := ioutil.TempDir("", "badger-test") @@ -175,7 +176,6 @@ func TestDropAllTwice(t *testing.T) { opts := getTestOptions("") opts.InMemory = true test(t, opts) - }) } @@ -278,8 +278,9 @@ func TestDropReadOnly(t *testing.T) { require.Equal(t, err, ErrWindowsNotSupported) } else { require.NoError(t, err) + require.Panics(t, func() { db2.DropAll() }) + require.NoError(t, db2.Close()) } - require.Panics(t, func() { db2.DropAll() }) } func TestWriteAfterClose(t *testing.T) { @@ -523,8 +524,9 @@ func TestDropPrefixReadOnly(t *testing.T) { require.Equal(t, err, ErrWindowsNotSupported) } else { require.NoError(t, err) + require.Panics(t, func() { db2.DropPrefix([]byte("key0")) }) + require.NoError(t, db2.Close()) } - require.Panics(t, func() { db2.DropPrefix([]byte("key0")) }) } func TestDropPrefixRace(t *testing.T) { @@ -590,7 +592,7 @@ func TestDropPrefixRace(t *testing.T) { after := numKeysManaged(db, math.MaxUint64) t.Logf("Before: %d. After dropprefix: %d\n", before, after) require.True(t, after < before) - db.Close() + require.NoError(t, db.Close()) } func TestWriteBatchManagedMode(t *testing.T) { diff --git a/value.go b/value.go index b68ef10ba..3bcac0e38 100644 --- a/value.go +++ b/value.go @@ -1219,7 +1219,7 @@ func (lf *logFile) init() error { } func (vlog *valueLog) Close() error { - if vlog.db.opt.InMemory { + if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory { return nil } // close flushDiscardStats. diff --git a/value_test.go b/value_test.go index 8c3c67a8a..107a90b4b 100644 --- a/value_test.go +++ b/value_test.go @@ -762,6 +762,7 @@ func TestPenultimateLogCorruption(t *testing.T) { db0, err := Open(opt) require.NoError(t, err) + defer func() { require.NoError(t, db0.Close()) }() h := testHelper{db: db0, t: t} h.writeRange(0, 7) @@ -780,13 +781,12 @@ func TestPenultimateLogCorruption(t *testing.T) { // Simulate a crash by not closing db0, but releasing the locks. if db0.dirLockGuard != nil { require.NoError(t, db0.dirLockGuard.release()) + db0.dirLockGuard = nil } if db0.valueDirGuard != nil { require.NoError(t, db0.valueDirGuard.release()) + db0.valueDirGuard = nil } - require.NoError(t, db0.vlog.Close()) - require.NoError(t, db0.manifest.close()) - require.NoError(t, db0.registry.Close()) opt.Truncate = true db1, err := Open(opt)