Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support entry version in Write batch #1310

Merged
merged 6 commits into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/dgraph-io/badger/v2/y"
"github.com/pkg/errors"
)

// WriteBatch holds the necessary info to perform batched writes.
Expand Down Expand Up @@ -88,6 +89,16 @@ func (wb *WriteBatch) callback(err error) {
wb.err = err
}

// SetEntryAt is the equivalent of Txn.SetEntry but it also allows setting version for the entry.
// SetEntryAt can be used only in managed mode.
func (wb *WriteBatch) SetEntryAt(e *Entry, ts uint64) error {
if !wb.db.opt.managedTxns {
return errors.New("SetEntryAt can only be used in managed mode. Use SetEntry instead")
}
e.version = ts
return wb.SetEntry(e)
}

// SetEntry is the equivalent of Txn.SetEntry.
func (wb *WriteBatch) SetEntry(e *Entry) error {
wb.Lock()
Expand Down Expand Up @@ -115,6 +126,12 @@ func (wb *WriteBatch) Set(k, v []byte) error {
return wb.SetEntry(e)
}

// DeleteAt is equivalent of Txn.Delete but accepts a delete timestamp.
func (wb *WriteBatch) DeleteAt(k []byte, ts uint64) error {
e := Entry{Key: k, meta: bitDelete, version: ts}
return wb.SetEntry(&e)
}

// Delete is equivalent of Txn.Delete.
func (wb *WriteBatch) Delete(k []byte) error {
wb.Lock()
Expand Down
3 changes: 3 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ func TestWriteBatch(t *testing.T) {
wb := db.NewWriteBatch()
defer wb.Cancel()

// Sanity check for SetEntryAt.
require.Error(t, wb.SetEntryAt(&Entry{}, 12))

N, M := 50000, 1000
start := time.Now()

Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
txn = append(txn, te)

default:
// This entry is from a rewrite.
// This entry is from a rewrite or via SetEntryAt(..).
toLSM(nk, v)

// We shouldn't get this entry in the middle of a transaction.
Expand Down
8 changes: 8 additions & 0 deletions managed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ func (db *DB) NewWriteBatchAt(commitTs uint64) *WriteBatch {
wb.txn.commitTs = commitTs
return wb
}
func (db *DB) NewManagedWriteBatch() *WriteBatch {
if !db.opt.managedTxns {
panic("cannot use NewWriteBatchAt with managedDB=false. Use NewWriteBatch instead")
}

wb := db.newWriteBatch()
return wb
}

// CommitAt commits the transaction, following the same logic as Commit(), but
// at the given commit timestamp. This will panic if not used with managed transactions.
Expand Down
47 changes: 47 additions & 0 deletions managed_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,53 @@ func TestWriteBatchManagedMode(t *testing.T) {
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}
func TestWriteBatchManaged(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.MaxTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewManagedWriteBatch()
defer wb.Cancel()

N, M := 50000, 1000
start := time.Now()

for i := 0; i < N; i++ {
require.NoError(t, wb.SetEntryAt(&Entry{Key: key(i), Value: val(i)}, 1))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.DeleteAt(key(i), 2))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))

err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()

i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
Expand Down
1 change: 1 addition & 0 deletions structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type Entry struct {
UserMeta byte
ExpiresAt uint64 // time.Unix
meta byte
version uint64

// Fields maintained internally.
offset uint32
Expand Down
81 changes: 61 additions & 20 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,21 @@ func (txn *Txn) commitAndSend() (func() error, error) {
defer orc.writeChLock.Unlock()

commitTs := orc.newCommitTs(txn)
if commitTs == 0 {
// The commitTs can be zero if the transaction is running in managed mode.
// Individual entries might have their own timestamps.
if commitTs == 0 && !txn.db.opt.managedTxns {
return nil, ErrConflict
}

keepTogether := true
for _, e := range txn.pendingWrites {
if e.version == 0 {
e.version = commitTs
} else {
keepTogether = false
}
}

// The following debug information is what led to determining the cause of
// bank txn violation bug, and it took a whole bunch of effort to narrow it
// down to here. So, keep this around for at least a couple of months.
Expand All @@ -478,21 +489,30 @@ func (txn *Txn) commitAndSend() (func() error, error) {
// txn.readTs, commitTs, txn.reads, txn.writes)
entries := make([]*Entry, 0, len(txn.pendingWrites)+1)
for _, e := range txn.pendingWrites {
// fmt.Fprintf(&b, "[%q : %q], ", e.Key, e.Value)

// Suffix the keys with commit ts, so the key versions are sorted in
// descending order of commit timestamp.
e.Key = y.KeyWithTs(e.Key, commitTs)
e.meta |= bitTxn
e.Key = y.KeyWithTs(e.Key, e.version)
// Add bitTxn only if these entries are part of a transaction. We
// support SetEntryAt(..) in managed mode which means a single
// transaction can have entries with different timestamps. If entries
// in a single transaction have different timestamps, we don't add the
// transaction markers.
if keepTogether {
e.meta |= bitTxn
}
entries = append(entries, e)
}
// log.Printf("%s\n", b.String())
e := &Entry{
Key: y.KeyWithTs(txnKey, commitTs),
Value: []byte(strconv.FormatUint(commitTs, 10)),
meta: bitFinTxn,

if keepTogether {
// CommitTs should not be zero if we're inserting transaction markers.
y.AssertTrue(commitTs != 0)
e := &Entry{
Key: y.KeyWithTs(txnKey, commitTs),
Value: []byte(strconv.FormatUint(commitTs, 10)),
meta: bitFinTxn,
}
entries = append(entries, e)
}
entries = append(entries, e)

req, err := txn.db.sendToWriteCh(entries)
if err != nil {
Expand All @@ -510,13 +530,26 @@ func (txn *Txn) commitAndSend() (func() error, error) {
return ret, nil
}

func (txn *Txn) commitPrecheck() {
if txn.commitTs == 0 && txn.db.opt.managedTxns {
panic("Commit cannot be called with managedDB=true. Use CommitAt.")
}
func (txn *Txn) commitPrecheck() error {
if txn.discarded {
panic("Trying to commit a discarded txn")
return errors.New("Trying to commit a discarded txn")
}
keepTogether := true
for _, e := range txn.pendingWrites {
if e.version != 0 {
keepTogether = false
}
}

// If keepTogether is True, it implies transaction markers will be added.
// In that case, commitTs should not be never be zero. This might happen if
// someone uses txn.Commit instead of txn.CommitAt in managed mode. This
// should happen only in managed mode. In normal mode, keepTogether will
// always be true.
if keepTogether && txn.db.opt.managedTxns && txn.commitTs == 0 {
return errors.New("CommitTs cannot be zero. Please use commitAt instead")
}
return nil
}

// Commit commits the transaction, following these steps:
Expand All @@ -538,7 +571,10 @@ func (txn *Txn) commitPrecheck() {
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func (txn *Txn) Commit() error {
txn.commitPrecheck() // Precheck before discarding txn.
// Precheck before discarding txn.
if err := txn.commitPrecheck(); err != nil {
return err
}
defer txn.Discard()

if len(txn.writes) == 0 {
Expand Down Expand Up @@ -583,13 +619,18 @@ func runTxnCallback(cb *txnCb) {
// so it is safe to increment sync.WaitGroup before calling CommitWith, and
// decrementing it in the callback; to block until all callbacks are run.
func (txn *Txn) CommitWith(cb func(error)) {
txn.commitPrecheck() // Precheck before discarding txn.
defer txn.Discard()

if cb == nil {
panic("Nil callback provided to CommitWith")
}

// Precheck before discarding txn.
if err := txn.commitPrecheck(); err != nil {
cb(err)
return
}

defer txn.Discard()

if len(txn.writes) == 0 {
// Do not run these callbacks from here, because the CommitWith and the
// callback might be acquiring the same locks. Instead run the callback
Expand Down
2 changes: 1 addition & 1 deletion txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func TestManagedDB(t *testing.T) {
for i := 0; i <= 3; i++ {
require.NoError(t, txn.SetEntry(NewEntry(key(i), val(i))))
}
require.Panics(t, func() { txn.Commit() })
require.Error(t, txn.Commit())
require.NoError(t, txn.CommitAt(3, nil))

// Read data at t=2.
Expand Down