Skip to content

Commit

Permalink
DropPrefix: Return error on blocked writes (hypermodeinc#1329)
Browse files Browse the repository at this point in the history
Signed-off-by: thomassong <[email protected]>
  • Loading branch information
mYmNeo committed Feb 13, 2023
1 parent 736ce8b commit 219ea7a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 13 deletions.
25 changes: 18 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,13 +1343,16 @@ func (db *DB) Flatten(workers int) error {
}
}

func (db *DB) blockWrite() {
func (db *DB) blockWrite() error {
// Stop accepting new writes.
atomic.StoreInt32(&db.blockWrites, 1)
if !atomic.CompareAndSwapInt32(&db.blockWrites, 0, 1) {
return ErrBlockedWrites
}

// Make all pending writes finish. The following will also close writeCh.
db.closers.writes.SignalAndWait()
db.opt.Infof("Writes flushed. Stopping compactions now...")
return nil
}

func (db *DB) unblockWrite() {
Expand All @@ -1360,14 +1363,16 @@ func (db *DB) unblockWrite() {
atomic.StoreInt32(&db.blockWrites, 0)
}

func (db *DB) prepareToDrop() func() {
func (db *DB) prepareToDrop() (func(), error) {
if db.opt.ReadOnly {
panic("Attempting to drop data in read-only mode.")
}
// In order prepare for drop, we need to block the incoming writes and
// write it to db. Then, flush all the pending flushtask. So that, we
// don't miss any entries.
db.blockWrite()
if err := db.blockWrite(); err != nil {
return nil, err
}
reqs := make([]*request, 0, 10)
for {
select {
Expand All @@ -1382,7 +1387,7 @@ func (db *DB) prepareToDrop() func() {
db.opt.Infof("Resuming writes")
db.startMemoryFlush()
db.unblockWrite()
}
}, nil
}
}
}
Expand All @@ -1408,7 +1413,10 @@ func (db *DB) DropAll() error {

func (db *DB) dropAll() (func(), error) {
db.opt.Infof("DropAll called. Blocking writes...")
f := db.prepareToDrop()
f, err := db.prepareToDrop()
if err != nil {
return f, err
}
// prepareToDrop will stop all the incomming write and flushes any pending flush tasks.
// Before we drop, we'll stop the compaction because anyways all the datas are going to
// be deleted.
Expand Down Expand Up @@ -1458,7 +1466,10 @@ func (db *DB) dropAll() (func(), error) {
// - Resume memtable flushes, compactions and writes.
func (db *DB) DropPrefix(prefixes ...[]byte) error {
db.opt.Infof("DropPrefix Called")
f := db.prepareToDrop()
f, err := db.prepareToDrop()
if err != nil {
return err
}
defer f()

filtered, err := db.filterPrefixesToDrop(prefixes)
Expand Down
75 changes: 69 additions & 6 deletions db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,16 @@ import (
"path"
"regexp"
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/badger/options"
"github.com/dgraph-io/badger/pb"
"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
"github.com/stretchr/testify/require"
)

func TestTruncateVlogWithClose(t *testing.T) {
Expand Down Expand Up @@ -522,12 +525,17 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table {
//
// The test has 3 steps
// Step 1 - Create badger data. It is necessary that the value size is
// greater than valuethreshold. The value log file size after
// this step is around 170 bytes.
//
// greater than valuethreshold. The value log file size after
// this step is around 170 bytes.
//
// Step 2 - Re-open the same badger and simulate a crash. The value log file
// size after this crash is around 2 GB (we increase the file size to mmap it).
//
// size after this crash is around 2 GB (we increase the file size to mmap it).
//
// Step 3 - Re-open the same badger. We should be able to read all the data
// inserted in the first step.
//
// inserted in the first step.
func TestWindowsDataLoss(t *testing.T) {
if runtime.GOOS != "windows" {
t.Skip("The test is only for Windows.")
Expand All @@ -551,7 +559,7 @@ func TestWindowsDataLoss(t *testing.T) {
v := []byte("barValuebarValuebarValuebarValuebarValue")
require.Greater(t, len(v), opt.ValueThreshold)

//32 bytes length and now it's not working
// 32 bytes length and now it's not working
err := txn.Set(key, v)
require.NoError(t, err)
keyList = append(keyList, key)
Expand Down Expand Up @@ -608,3 +616,58 @@ func TestWindowsDataLoss(t *testing.T) {
}
require.ElementsMatch(t, keyList, result)
}

func TestDropAllDropPrefix(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
wb := db.NewWriteBatch()
defer wb.Cancel()

N := 50000

for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i)))
}
require.NoError(t, wb.Flush())

var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
err := db.DropPrefix([]byte("000"))
for err == ErrBlockedWrites {
fmt.Printf("DropPrefix 000 err: %v", err)
err = db.DropPrefix([]byte("000"))
time.Sleep(time.Millisecond * 500)
}
require.NoError(t, err)
}()
go func() {
defer wg.Done()
err := db.DropPrefix([]byte("111"))
for err == ErrBlockedWrites {
fmt.Printf("DropPrefix 111 err: %v", err)
err = db.DropPrefix([]byte("111"))
time.Sleep(time.Millisecond * 500)
}
require.NoError(t, err)
}()
go func() {
time.Sleep(time.Millisecond) // Let drop prefix run first.
defer wg.Done()
err := db.DropAll()
for err == ErrBlockedWrites {
fmt.Printf("dropAll err: %v", err)
err = db.DropAll()
time.Sleep(time.Millisecond * 300)
}
require.NoError(t, err)
}()
wg.Wait()
})
}

0 comments on commit 219ea7a

Please sign in to comment.