diff --git a/options.go b/options.go index fff7c65ab..17af22354 100644 --- a/options.go +++ b/options.go @@ -93,6 +93,11 @@ type Options struct { // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks. ChecksumVerificationMode options.ChecksumVerificationMode + // DetectConflicts determines whether the transactions would be checked for + // conflicts. The transactions can be processed at a higher rate when + // conflict detection is disabled. + DetectConflicts bool + // KeepBlockIndicesInCache decides whether to keep the block offsets in the cache or not. KeepBlockIndicesInCache bool @@ -163,6 +168,7 @@ func DefaultOptions(path string) Options { LogRotatesToFlush: 2, EncryptionKey: []byte{}, EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days. + DetectConflicts: true, KeepBlocksInCache: false, KeepBlockIndicesInCache: false, } @@ -643,6 +649,20 @@ func (opt Options) WithLoadBloomsOnOpen(b bool) Options { return opt } +// WithDetectConflicts returns a new Options value with DetectConflicts set to the given value. +// +// Detect conflicts options determines if the transactions would be checked for +// conflicts before committing them. When this option is set to false +// (detectConflicts=false) badger can process transactions at a higher rate. +// Setting this options to false might be useful when the user application +// deals with conflict detection and resolution. +// +// The default value of Detect conflicts is True. +func (opt Options) WithDetectConflicts(b bool) Options { + opt.DetectConflicts = b + return opt +} + // WithKeepBlockIndicesInCache returns a new Option value with KeepBlockOffsetInCache set to the // given value. // diff --git a/txn.go b/txn.go index cb531350f..3b16cb9a0 100644 --- a/txn.go +++ b/txn.go @@ -32,7 +32,8 @@ import ( ) type oracle struct { - isManaged bool // Does not change value, so no locking required. + isManaged bool // Does not change value, so no locking required. + detectConflicts bool // Determines if the txns should be checked for conflicts. sync.Mutex // For nextTxnTs and commits. // writeChLock lock is for ensuring that transactions go to the write @@ -58,13 +59,15 @@ type oracle struct { } type committedTxn struct { - ts uint64 - writes map[uint64]struct{} + ts uint64 + // ConflictKeys Keeps track of the entries written at timestamp ts. + conflictKeys map[uint64]struct{} } func newOracle(opt Options) *oracle { orc := &oracle{ - isManaged: opt.managedTxns, + isManaged: opt.managedTxns, + detectConflicts: opt.DetectConflicts, // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open. // // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here. @@ -148,7 +151,7 @@ func (o *oracle) hasConflict(txn *Txn) bool { } for _, ro := range txn.reads { - if _, has := committedTxn.writes[ro]; has { + if _, has := committedTxn.conflictKeys[ro]; has { return true } } @@ -182,13 +185,12 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { y.AssertTrue(ts >= o.lastCleanupTs) - if !o.isManaged { - // We should ensure that txns are not added to o.committedTxns slice in - // managed mode. If the user doesn't set o.discardTs, the commitTxns - // slice would keep growing in managed mode. + if o.detectConflicts { + // We should ensure that txns are not added to o.committedTxns slice when + // conflict detection is disabled otherwise this slice would keep growing. o.committedTxns = append(o.committedTxns, committedTxn{ - ts: ts, - writes: txn.writes, + ts: ts, + conflictKeys: txn.conflictKeys, }) } @@ -203,10 +205,9 @@ func (o *oracle) doneRead(txn *Txn) { } func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock - if o.isManaged { - // In managedMode, we do not store any committedTxns. It is expected - // that the system using badger in managedmode performs it's own - // conflict detection. + if !o.detectConflicts { + // When detectConflicts is set to false, we do not store any + // committedTxns and so there's nothing to clean up. return } // Same logic as discardAtOrBelow but unlocked @@ -249,10 +250,11 @@ type Txn struct { readTs uint64 commitTs uint64 - update bool // update is used to conditionally keep track of reads. - reads []uint64 // contains fingerprints of keys read. - writes map[uint64]struct{} // contains fingerprints of keys written. - readsLock sync.Mutex // guards the reads slice. See addReadKey. + update bool // update is used to conditionally keep track of reads. + reads []uint64 // contains fingerprints of keys read. + // contains fingerprints of keys written. This is used for conflict detection. + conflictKeys map[uint64]struct{} + readsLock sync.Mutex // guards the reads slice. See addReadKey. pendingWrites map[string]*Entry // cache stores any writes done by txn. duplicateWrites []*Entry // Used in managed mode to store duplicate entries. @@ -383,8 +385,13 @@ func (txn *Txn) modify(e *Entry) error { if err := txn.checkSize(e); err != nil { return err } - fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. - txn.writes[fp] = struct{}{} + + // The txn.conflictKeys is used for conflict detection. If conflict detection + // is disabled, we don't need to store key hashes in this map. + if txn.db.opt.DetectConflicts { + fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. + txn.conflictKeys[fp] = struct{}{} + } // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. // Add the entry to duplicateWrites only if both the entries have different versions. For // same versions, we will overwrite the existing entry. @@ -570,7 +577,7 @@ func (txn *Txn) commitAndSend() (func() error, error) { // down to here. So, keep this around for at least a couple of months. // var b strings.Builder // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ", - // txn.readTs, commitTs, txn.reads, txn.writes) + // txn.readTs, commitTs, txn.reads, txn.conflictKeys) for _, e := range txn.pendingWrites { processEntry(e) } @@ -646,7 +653,9 @@ func (txn *Txn) commitPrecheck() error { // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM // tree won't be updated, so there's no need for any rollback. func (txn *Txn) Commit() error { - if len(txn.writes) == 0 { + // txn.conflictKeys can be zero if conflict detection is turned off. So we + // should check txn.pendingWrites. + if len(txn.pendingWrites) == 0 { return nil // Nothing to do. } // Precheck before discarding txn. @@ -697,7 +706,7 @@ func (txn *Txn) CommitWith(cb func(error)) { panic("Nil callback provided to CommitWith") } - if len(txn.writes) == 0 { + if len(txn.pendingWrites) == 0 { // Do not run these callbacks from here, because the CommitWith and the // callback might be acquiring the same locks. Instead run the callback // from another goroutine. @@ -764,7 +773,9 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn { size: int64(len(txnKey) + 10), // Some buffer for the extra entry. } if update { - txn.writes = make(map[uint64]struct{}) + if db.opt.DetectConflicts { + txn.conflictKeys = make(map[uint64]struct{}) + } txn.pendingWrites = make(map[string]*Entry) } if !isManaged {