From 219ea7a617bb0562073344050d1eb3603a1623c5 Mon Sep 17 00:00:00 2001 From: thomassong Date: Mon, 16 Jan 2023 19:46:56 +0800 Subject: [PATCH] DropPrefix: Return error on blocked writes (#1329) Signed-off-by: thomassong --- db.go | 25 +++++++++++++----- db2_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 87 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index 6a42893a7..e07d9bb2c 100644 --- a/db.go +++ b/db.go @@ -1343,13 +1343,16 @@ func (db *DB) Flatten(workers int) error { } } -func (db *DB) blockWrite() { +func (db *DB) blockWrite() error { // Stop accepting new writes. - atomic.StoreInt32(&db.blockWrites, 1) + if !atomic.CompareAndSwapInt32(&db.blockWrites, 0, 1) { + return ErrBlockedWrites + } // Make all pending writes finish. The following will also close writeCh. db.closers.writes.SignalAndWait() db.opt.Infof("Writes flushed. Stopping compactions now...") + return nil } func (db *DB) unblockWrite() { @@ -1360,14 +1363,16 @@ func (db *DB) unblockWrite() { atomic.StoreInt32(&db.blockWrites, 0) } -func (db *DB) prepareToDrop() func() { +func (db *DB) prepareToDrop() (func(), error) { if db.opt.ReadOnly { panic("Attempting to drop data in read-only mode.") } // In order prepare for drop, we need to block the incoming writes and // write it to db. Then, flush all the pending flushtask. So that, we // don't miss any entries. - db.blockWrite() + if err := db.blockWrite(); err != nil { + return nil, err + } reqs := make([]*request, 0, 10) for { select { @@ -1382,7 +1387,7 @@ func (db *DB) prepareToDrop() func() { db.opt.Infof("Resuming writes") db.startMemoryFlush() db.unblockWrite() - } + }, nil } } } @@ -1408,7 +1413,10 @@ func (db *DB) DropAll() error { func (db *DB) dropAll() (func(), error) { db.opt.Infof("DropAll called. Blocking writes...") - f := db.prepareToDrop() + f, err := db.prepareToDrop() + if err != nil { + return f, err + } // prepareToDrop will stop all the incomming write and flushes any pending flush tasks. // Before we drop, we'll stop the compaction because anyways all the datas are going to // be deleted. @@ -1458,7 +1466,10 @@ func (db *DB) dropAll() (func(), error) { // - Resume memtable flushes, compactions and writes. func (db *DB) DropPrefix(prefixes ...[]byte) error { db.opt.Infof("DropPrefix Called") - f := db.prepareToDrop() + f, err := db.prepareToDrop() + if err != nil { + return err + } defer f() filtered, err := db.filterPrefixesToDrop(prefixes) diff --git a/db2_test.go b/db2_test.go index 070cb7ff4..058b03bde 100644 --- a/db2_test.go +++ b/db2_test.go @@ -29,13 +29,16 @@ import ( "path" "regexp" "runtime" + "sync" "testing" + "time" + + "github.com/stretchr/testify/require" "github.com/dgraph-io/badger/options" "github.com/dgraph-io/badger/pb" "github.com/dgraph-io/badger/table" "github.com/dgraph-io/badger/y" - "github.com/stretchr/testify/require" ) func TestTruncateVlogWithClose(t *testing.T) { @@ -522,12 +525,17 @@ func createTableWithRange(t *testing.T, db *DB, start, end int) *table.Table { // // The test has 3 steps // Step 1 - Create badger data. It is necessary that the value size is -// greater than valuethreshold. The value log file size after -// this step is around 170 bytes. +// +// greater than valuethreshold. The value log file size after +// this step is around 170 bytes. +// // Step 2 - Re-open the same badger and simulate a crash. The value log file -// size after this crash is around 2 GB (we increase the file size to mmap it). +// +// size after this crash is around 2 GB (we increase the file size to mmap it). +// // Step 3 - Re-open the same badger. We should be able to read all the data -// inserted in the first step. +// +// inserted in the first step. func TestWindowsDataLoss(t *testing.T) { if runtime.GOOS != "windows" { t.Skip("The test is only for Windows.") @@ -551,7 +559,7 @@ func TestWindowsDataLoss(t *testing.T) { v := []byte("barValuebarValuebarValuebarValuebarValue") require.Greater(t, len(v), opt.ValueThreshold) - //32 bytes length and now it's not working + // 32 bytes length and now it's not working err := txn.Set(key, v) require.NoError(t, err) keyList = append(keyList, key) @@ -608,3 +616,58 @@ func TestWindowsDataLoss(t *testing.T) { } require.ElementsMatch(t, keyList, result) } + +func TestDropAllDropPrefix(t *testing.T) { + key := func(i int) []byte { + return []byte(fmt.Sprintf("%10d", i)) + } + val := func(i int) []byte { + return []byte(fmt.Sprintf("%128d", i)) + } + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + wb := db.NewWriteBatch() + defer wb.Cancel() + + N := 50000 + + for i := 0; i < N; i++ { + require.NoError(t, wb.Set(key(i), val(i))) + } + require.NoError(t, wb.Flush()) + + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + err := db.DropPrefix([]byte("000")) + for err == ErrBlockedWrites { + fmt.Printf("DropPrefix 000 err: %v", err) + err = db.DropPrefix([]byte("000")) + time.Sleep(time.Millisecond * 500) + } + require.NoError(t, err) + }() + go func() { + defer wg.Done() + err := db.DropPrefix([]byte("111")) + for err == ErrBlockedWrites { + fmt.Printf("DropPrefix 111 err: %v", err) + err = db.DropPrefix([]byte("111")) + time.Sleep(time.Millisecond * 500) + } + require.NoError(t, err) + }() + go func() { + time.Sleep(time.Millisecond) // Let drop prefix run first. + defer wg.Done() + err := db.DropAll() + for err == ErrBlockedWrites { + fmt.Printf("dropAll err: %v", err) + err = db.DropAll() + time.Sleep(time.Millisecond * 300) + } + require.NoError(t, err) + }() + wg.Wait() + }) +}