Skip to content

Commit

Permalink
Support disabling conflict detection (#1344)
Browse files Browse the repository at this point in the history
This commit adds support for disabling conflict detection by setting the
option.DetectConflicts=false. When conflict detection is disabled
badger will not store information required to detect the conflict. This
reduces the amount of memory used by transactions running parallely and
also allows transactions to be processed at a faster rate.

fixes BADGER-207
  • Loading branch information
Ibrahim Jarif authored Jun 3, 2020
1 parent fd89894 commit 056d859
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 25 deletions.
20 changes: 20 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type Options struct {
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
ChecksumVerificationMode options.ChecksumVerificationMode

// DetectConflicts determines whether the transactions would be checked for
// conflicts. The transactions can be processed at a higher rate when
// conflict detection is disabled.
DetectConflicts bool

// KeepBlockIndicesInCache decides whether to keep the block offsets in the cache or not.
KeepBlockIndicesInCache bool

Expand Down Expand Up @@ -163,6 +168,7 @@ func DefaultOptions(path string) Options {
LogRotatesToFlush: 2,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
DetectConflicts: true,
KeepBlocksInCache: false,
KeepBlockIndicesInCache: false,
}
Expand Down Expand Up @@ -643,6 +649,20 @@ func (opt Options) WithLoadBloomsOnOpen(b bool) Options {
return opt
}

// WithDetectConflicts returns a new Options value with DetectConflicts set to the given value.
//
// Detect conflicts options determines if the transactions would be checked for
// conflicts before committing them. When this option is set to false
// (detectConflicts=false) badger can process transactions at a higher rate.
// Setting this options to false might be useful when the user application
// deals with conflict detection and resolution.
//
// The default value of Detect conflicts is True.
func (opt Options) WithDetectConflicts(b bool) Options {
opt.DetectConflicts = b
return opt
}

// WithKeepBlockIndicesInCache returns a new Option value with KeepBlockOffsetInCache set to the
// given value.
//
Expand Down
61 changes: 36 additions & 25 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

type oracle struct {
isManaged bool // Does not change value, so no locking required.
isManaged bool // Does not change value, so no locking required.
detectConflicts bool // Determines if the txns should be checked for conflicts.

sync.Mutex // For nextTxnTs and commits.
// writeChLock lock is for ensuring that transactions go to the write
Expand All @@ -58,13 +59,15 @@ type oracle struct {
}

type committedTxn struct {
ts uint64
writes map[uint64]struct{}
ts uint64
// ConflictKeys Keeps track of the entries written at timestamp ts.
conflictKeys map[uint64]struct{}
}

func newOracle(opt Options) *oracle {
orc := &oracle{
isManaged: opt.managedTxns,
isManaged: opt.managedTxns,
detectConflicts: opt.DetectConflicts,
// We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
//
// WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
Expand Down Expand Up @@ -148,7 +151,7 @@ func (o *oracle) hasConflict(txn *Txn) bool {
}

for _, ro := range txn.reads {
if _, has := committedTxn.writes[ro]; has {
if _, has := committedTxn.conflictKeys[ro]; has {
return true
}
}
Expand Down Expand Up @@ -182,13 +185,12 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 {

y.AssertTrue(ts >= o.lastCleanupTs)

if !o.isManaged {
// We should ensure that txns are not added to o.committedTxns slice in
// managed mode. If the user doesn't set o.discardTs, the commitTxns
// slice would keep growing in managed mode.
if o.detectConflicts {
// We should ensure that txns are not added to o.committedTxns slice when
// conflict detection is disabled otherwise this slice would keep growing.
o.committedTxns = append(o.committedTxns, committedTxn{
ts: ts,
writes: txn.writes,
ts: ts,
conflictKeys: txn.conflictKeys,
})
}

Expand All @@ -203,10 +205,9 @@ func (o *oracle) doneRead(txn *Txn) {
}

func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
if o.isManaged {
// In managedMode, we do not store any committedTxns. It is expected
// that the system using badger in managedmode performs it's own
// conflict detection.
if !o.detectConflicts {
// When detectConflicts is set to false, we do not store any
// committedTxns and so there's nothing to clean up.
return
}
// Same logic as discardAtOrBelow but unlocked
Expand Down Expand Up @@ -249,10 +250,11 @@ type Txn struct {
readTs uint64
commitTs uint64

update bool // update is used to conditionally keep track of reads.
reads []uint64 // contains fingerprints of keys read.
writes map[uint64]struct{} // contains fingerprints of keys written.
readsLock sync.Mutex // guards the reads slice. See addReadKey.
update bool // update is used to conditionally keep track of reads.
reads []uint64 // contains fingerprints of keys read.
// contains fingerprints of keys written. This is used for conflict detection.
conflictKeys map[uint64]struct{}
readsLock sync.Mutex // guards the reads slice. See addReadKey.

pendingWrites map[string]*Entry // cache stores any writes done by txn.
duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
Expand Down Expand Up @@ -383,8 +385,13 @@ func (txn *Txn) modify(e *Entry) error {
if err := txn.checkSize(e); err != nil {
return err
}
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.writes[fp] = struct{}{}

// The txn.conflictKeys is used for conflict detection. If conflict detection
// is disabled, we don't need to store key hashes in this map.
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = struct{}{}
}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
// same versions, we will overwrite the existing entry.
Expand Down Expand Up @@ -570,7 +577,7 @@ func (txn *Txn) commitAndSend() (func() error, error) {
// down to here. So, keep this around for at least a couple of months.
// var b strings.Builder
// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
// txn.readTs, commitTs, txn.reads, txn.writes)
// txn.readTs, commitTs, txn.reads, txn.conflictKeys)
for _, e := range txn.pendingWrites {
processEntry(e)
}
Expand Down Expand Up @@ -646,7 +653,9 @@ func (txn *Txn) commitPrecheck() error {
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func (txn *Txn) Commit() error {
if len(txn.writes) == 0 {
// txn.conflictKeys can be zero if conflict detection is turned off. So we
// should check txn.pendingWrites.
if len(txn.pendingWrites) == 0 {
return nil // Nothing to do.
}
// Precheck before discarding txn.
Expand Down Expand Up @@ -697,7 +706,7 @@ func (txn *Txn) CommitWith(cb func(error)) {
panic("Nil callback provided to CommitWith")
}

if len(txn.writes) == 0 {
if len(txn.pendingWrites) == 0 {
// Do not run these callbacks from here, because the CommitWith and the
// callback might be acquiring the same locks. Instead run the callback
// from another goroutine.
Expand Down Expand Up @@ -764,7 +773,9 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
}
if update {
txn.writes = make(map[uint64]struct{})
if db.opt.DetectConflicts {
txn.conflictKeys = make(map[uint64]struct{})
}
txn.pendingWrites = make(map[string]*Entry)
}
if !isManaged {
Expand Down

0 comments on commit 056d859

Please sign in to comment.