Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support disabling conflict detection #1344

Merged
merged 5 commits into from
Jun 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type Options struct {
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
ChecksumVerificationMode options.ChecksumVerificationMode

// DetectConflicts determines whether the transactions would be checked for
// conflicts. The transactions can be processed at a higher rate when
// conflict detection is disabled.
DetectConflicts bool

// KeepBlockIndicesInCache decides whether to keep the block offsets in the cache or not.
KeepBlockIndicesInCache bool

Expand Down Expand Up @@ -163,6 +168,7 @@ func DefaultOptions(path string) Options {
LogRotatesToFlush: 2,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
DetectConflicts: true,
KeepBlocksInCache: false,
KeepBlockIndicesInCache: false,
}
Expand Down Expand Up @@ -643,6 +649,20 @@ func (opt Options) WithLoadBloomsOnOpen(b bool) Options {
return opt
}

// WithDetectConflicts returns a new Options value with DetectConflicts set to the given value.
//
// Detect conflicts options determines if the transactions would be checked for
// conflicts before committing them. When this option is set to false
// (detectConflicts=false) badger can process transactions at a higher rate.
// Setting this options to false might be useful when the user application
// deals with conflict detection and resolution.
//
// The default value of Detect conflicts is True.
func (opt Options) WithDetectConflicts(b bool) Options {
opt.DetectConflicts = b
return opt
}

// WithKeepBlockIndicesInCache returns a new Option value with KeepBlockOffsetInCache set to the
// given value.
//
Expand Down
61 changes: 36 additions & 25 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

type oracle struct {
isManaged bool // Does not change value, so no locking required.
isManaged bool // Does not change value, so no locking required.
detectConflicts bool // Determines if the txns should be checked for conflicts.

sync.Mutex // For nextTxnTs and commits.
// writeChLock lock is for ensuring that transactions go to the write
Expand All @@ -58,13 +59,15 @@ type oracle struct {
}

type committedTxn struct {
ts uint64
writes map[uint64]struct{}
ts uint64
// ConflictKeys Keeps track of the entries written at timestamp ts.
conflictKeys map[uint64]struct{}
}

func newOracle(opt Options) *oracle {
orc := &oracle{
isManaged: opt.managedTxns,
isManaged: opt.managedTxns,
detectConflicts: opt.DetectConflicts,
// We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
//
// WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
Expand Down Expand Up @@ -148,7 +151,7 @@ func (o *oracle) hasConflict(txn *Txn) bool {
}

for _, ro := range txn.reads {
if _, has := committedTxn.writes[ro]; has {
if _, has := committedTxn.conflictKeys[ro]; has {
return true
}
}
Expand Down Expand Up @@ -182,13 +185,12 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 {

y.AssertTrue(ts >= o.lastCleanupTs)

if !o.isManaged {
// We should ensure that txns are not added to o.committedTxns slice in
// managed mode. If the user doesn't set o.discardTs, the commitTxns
// slice would keep growing in managed mode.
if o.detectConflicts {
// We should ensure that txns are not added to o.committedTxns slice when
// conflict detection is disabled otherwise this slice would keep growing.
o.committedTxns = append(o.committedTxns, committedTxn{
ts: ts,
writes: txn.writes,
ts: ts,
conflictKeys: txn.conflictKeys,
})
}

Expand All @@ -203,10 +205,9 @@ func (o *oracle) doneRead(txn *Txn) {
}

func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
if o.isManaged {
// In managedMode, we do not store any committedTxns. It is expected
// that the system using badger in managedmode performs it's own
// conflict detection.
if !o.detectConflicts {
// When detectConflicts is set to false, we do not store any
// committedTxns and so there's nothing to clean up.
return
}
// Same logic as discardAtOrBelow but unlocked
Expand Down Expand Up @@ -249,10 +250,11 @@ type Txn struct {
readTs uint64
commitTs uint64

update bool // update is used to conditionally keep track of reads.
reads []uint64 // contains fingerprints of keys read.
writes map[uint64]struct{} // contains fingerprints of keys written.
readsLock sync.Mutex // guards the reads slice. See addReadKey.
update bool // update is used to conditionally keep track of reads.
reads []uint64 // contains fingerprints of keys read.
// contains fingerprints of keys written. This is used for conflict detection.
conflictKeys map[uint64]struct{}
readsLock sync.Mutex // guards the reads slice. See addReadKey.

pendingWrites map[string]*Entry // cache stores any writes done by txn.
duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
Expand Down Expand Up @@ -383,8 +385,13 @@ func (txn *Txn) modify(e *Entry) error {
if err := txn.checkSize(e); err != nil {
return err
}
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.writes[fp] = struct{}{}

// The txn.conflictKeys is used for conflict detection. If conflict detection
// is disabled, we don't need to store key hashes in this map.
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = struct{}{}
}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
// same versions, we will overwrite the existing entry.
Expand Down Expand Up @@ -570,7 +577,7 @@ func (txn *Txn) commitAndSend() (func() error, error) {
// down to here. So, keep this around for at least a couple of months.
// var b strings.Builder
// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
// txn.readTs, commitTs, txn.reads, txn.writes)
// txn.readTs, commitTs, txn.reads, txn.conflictKeys)
for _, e := range txn.pendingWrites {
processEntry(e)
}
Expand Down Expand Up @@ -646,7 +653,9 @@ func (txn *Txn) commitPrecheck() error {
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func (txn *Txn) Commit() error {
if len(txn.writes) == 0 {
// txn.conflictKeys can be zero if conflict detection is turned off. So we
// should check txn.pendingWrites.
if len(txn.pendingWrites) == 0 {
return nil // Nothing to do.
}
// Precheck before discarding txn.
Expand Down Expand Up @@ -697,7 +706,7 @@ func (txn *Txn) CommitWith(cb func(error)) {
panic("Nil callback provided to CommitWith")
}

if len(txn.writes) == 0 {
if len(txn.pendingWrites) == 0 {
// Do not run these callbacks from here, because the CommitWith and the
// callback might be acquiring the same locks. Instead run the callback
// from another goroutine.
Expand Down Expand Up @@ -764,7 +773,9 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
}
if update {
txn.writes = make(map[uint64]struct{})
if db.opt.DetectConflicts {
txn.conflictKeys = make(map[uint64]struct{})
}
txn.pendingWrites = make(map[string]*Entry)
}
if !isManaged {
Expand Down