Skip to content

Commit

Permalink
Revert "Support disabling conflict detection (#1344)"
Browse files Browse the repository at this point in the history
This reverts commit 056d859.
  • Loading branch information
Ibrahim Jarif committed Oct 1, 2020
1 parent feb98a8 commit deb1181
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 47 deletions.
43 changes: 29 additions & 14 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ type Options struct {
// ChecksumVerificationMode decides when db should verify checksums for SSTable blocks.
ChecksumVerificationMode options.ChecksumVerificationMode

// DetectConflicts determines whether the transactions would be checked for
// conflicts. The transactions can be processed at a higher rate when
// conflict detection is disabled.
DetectConflicts bool
// KeepBlockIndicesInCache decides whether to keep the block offsets in the cache or not.
KeepBlockIndicesInCache bool

// KeepBlocksInCache decides whether to keep the sst blocks in the cache or not.
KeepBlocksInCache bool

// Transaction start and commit timestamps are managed by end-user.
// This is only useful for databases built on top of Badger (like Dgraph).
Expand Down Expand Up @@ -163,7 +164,8 @@ func DefaultOptions(path string) Options {
LogRotatesToFlush: 2,
EncryptionKey: []byte{},
EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days.
DetectConflicts: true,
KeepBlocksInCache: false,
KeepBlockIndicesInCache: false,
}
}

Expand Down Expand Up @@ -643,16 +645,29 @@ func (opt Options) WithIndexCacheSize(size int64) Options {
return opt
}

// WithDetectConflicts returns a new Options value with DetectConflicts set to the given value.
// WithKeepBlockIndicesInCache returns a new Option value with KeepBlockOffsetInCache set to the
// given value.
//
// When this option is set badger will store the block offsets in a cache along with the blocks.
// The size of the cache is determined by the MaxCacheSize option.If the MaxCacheSize is set to
// zero, then MaxCacheSize is set to 100 mb. When indices are stored in the cache, the read
// performance might be affected but the cache limits the amount of memory used by the indices.
//
// The default value of KeepBlockOffsetInCache is false.
func (opt Options) WithKeepBlockIndicesInCache(val bool) Options {
opt.KeepBlockIndicesInCache = val
return opt
}

// WithKeepBlocksInCache returns a new Option value with KeepBlocksInCache set to the
// given value.
//
// Detect conflicts options determines if the transactions would be checked for
// conflicts before committing them. When this option is set to false
// (detectConflicts=false) badger can process transactions at a higher rate.
// Setting this options to false might be useful when the user application
// deals with conflict detection and resolution.
// When this option is set badger will store the block in the cache. The size of the cache is
// determined by the MaxCacheSize option.If the MaxCacheSize is set to zero,
// then MaxCacheSize is set to 100 mb.
//
// The default value of Detect conflicts is True.
func (opt Options) WithDetectConflicts(b bool) Options {
opt.DetectConflicts = b
// The default value of KeepBlocksInCache is false.
func (opt Options) WithKeepBlocksInCache(val bool) Options {
opt.KeepBlocksInCache = val
return opt
}
56 changes: 23 additions & 33 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ import (
)

type oracle struct {
isManaged bool // Does not change value, so no locking required.
detectConflicts bool // Determines if the txns should be checked for conflicts.
isManaged bool // Does not change value, so no locking required.

sync.Mutex // For nextTxnTs and commits.
// writeChLock lock is for ensuring that transactions go to the write
Expand All @@ -59,15 +58,13 @@ type oracle struct {
}

type committedTxn struct {
ts uint64
// ConflictKeys Keeps track of the entries written at timestamp ts.
conflictKeys map[uint64]struct{}
ts uint64
writes map[uint64]struct{}
}

func newOracle(opt Options) *oracle {
orc := &oracle{
isManaged: opt.managedTxns,
detectConflicts: opt.DetectConflicts,
isManaged: opt.managedTxns,
// We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open.
//
// WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here.
Expand Down Expand Up @@ -151,7 +148,7 @@ func (o *oracle) hasConflict(txn *Txn) bool {
}

for _, ro := range txn.reads {
if _, has := committedTxn.conflictKeys[ro]; has {
if _, has := committedTxn.writes[ro]; has {
return true
}
}
Expand Down Expand Up @@ -185,12 +182,13 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 {

y.AssertTrue(ts >= o.lastCleanupTs)

if o.detectConflicts {
// We should ensure that txns are not added to o.committedTxns slice when
// conflict detection is disabled otherwise this slice would keep growing.
if !o.isManaged {
// We should ensure that txns are not added to o.committedTxns slice in
// managed mode. If the user doesn't set o.discardTs, the commitTxns
// slice would keep growing in managed mode.
o.committedTxns = append(o.committedTxns, committedTxn{
ts: ts,
conflictKeys: txn.conflictKeys,
ts: ts,
writes: txn.writes,
})
}

Expand All @@ -205,9 +203,10 @@ func (o *oracle) doneRead(txn *Txn) {
}

func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock
if !o.detectConflicts {
// When detectConflicts is set to false, we do not store any
// committedTxns and so there's nothing to clean up.
if o.isManaged {
// In managedMode, we do not store any committedTxns. It is expected
// that the system using badger in managedmode performs it's own
// conflict detection.
return
}
// Same logic as discardAtOrBelow but unlocked
Expand Down Expand Up @@ -255,8 +254,8 @@ type Txn struct {

reads []uint64 // contains fingerprints of keys read.
// contains fingerprints of keys written. This is used for conflict detection.
conflictKeys map[uint64]struct{}
readsLock sync.Mutex // guards the reads slice. See addReadKey.
writes map[uint64]struct{} // contains fingerprints of keys written.
readsLock sync.Mutex // guards the reads slice. See addReadKey.

pendingWrites map[string]*Entry // cache stores any writes done by txn.
duplicateWrites []*Entry // Used in managed mode to store duplicate entries.
Expand Down Expand Up @@ -384,13 +383,8 @@ func (txn *Txn) modify(e *Entry) error {
if err := txn.checkSize(e); err != nil {
return err
}

// The txn.conflictKeys is used for conflict detection. If conflict detection
// is disabled, we don't need to store key hashes in this map.
if txn.db.opt.DetectConflicts {
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.conflictKeys[fp] = struct{}{}
}
fp := z.MemHash(e.Key) // Avoid dealing with byte arrays.
txn.writes[fp] = struct{}{}
// If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice.
// Add the entry to duplicateWrites only if both the entries have different versions. For
// same versions, we will overwrite the existing entry.
Expand Down Expand Up @@ -575,7 +569,7 @@ func (txn *Txn) commitAndSend() (func() error, error) {
// down to here. So, keep this around for at least a couple of months.
// var b strings.Builder
// fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ",
// txn.readTs, commitTs, txn.reads, txn.conflictKeys)
// txn.readTs, commitTs, txn.reads, txn.writes)
for _, e := range txn.pendingWrites {
processEntry(e)
}
Expand Down Expand Up @@ -651,9 +645,7 @@ func (txn *Txn) commitPrecheck() error {
// If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM
// tree won't be updated, so there's no need for any rollback.
func (txn *Txn) Commit() error {
// txn.conflictKeys can be zero if conflict detection is turned off. So we
// should check txn.pendingWrites.
if len(txn.pendingWrites) == 0 {
if len(txn.writes) == 0 {
return nil // Nothing to do.
}
// Precheck before discarding txn.
Expand Down Expand Up @@ -704,7 +696,7 @@ func (txn *Txn) CommitWith(cb func(error)) {
panic("Nil callback provided to CommitWith")
}

if len(txn.pendingWrites) == 0 {
if len(txn.writes) == 0 {
// Do not run these callbacks from here, because the CommitWith and the
// callback might be acquiring the same locks. Instead run the callback
// from another goroutine.
Expand Down Expand Up @@ -771,9 +763,7 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn {
size: int64(len(txnKey) + 10), // Some buffer for the extra entry.
}
if update {
if db.opt.DetectConflicts {
txn.conflictKeys = make(map[uint64]struct{})
}
txn.writes = make(map[uint64]struct{})
txn.pendingWrites = make(map[string]*Entry)
}
if !isManaged {
Expand Down

0 comments on commit deb1181

Please sign in to comment.