From deb118101619d4a7349d7061f59a253b70307faf Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Fri, 2 Oct 2020 02:52:12 +0530 Subject: [PATCH] Revert "Support disabling conflict detection (#1344)" This reverts commit 056d859606380082d8694d2ad3b3ebf4b10318d7. --- options.go | 43 +++++++++++++++++++++++++++-------------- txn.go | 56 ++++++++++++++++++++++-------------------------------- 2 files changed, 52 insertions(+), 47 deletions(-) diff --git a/options.go b/options.go index a0414a538..14ad9dd09 100644 --- a/options.go +++ b/options.go @@ -93,10 +93,11 @@ type Options struct { // ChecksumVerificationMode decides when db should verify checksums for SSTable blocks. ChecksumVerificationMode options.ChecksumVerificationMode - // DetectConflicts determines whether the transactions would be checked for - // conflicts. The transactions can be processed at a higher rate when - // conflict detection is disabled. - DetectConflicts bool + // KeepBlockIndicesInCache decides whether to keep the block offsets in the cache or not. + KeepBlockIndicesInCache bool + + // KeepBlocksInCache decides whether to keep the sst blocks in the cache or not. + KeepBlocksInCache bool // Transaction start and commit timestamps are managed by end-user. // This is only useful for databases built on top of Badger (like Dgraph). @@ -163,7 +164,8 @@ func DefaultOptions(path string) Options { LogRotatesToFlush: 2, EncryptionKey: []byte{}, EncryptionKeyRotationDuration: 10 * 24 * time.Hour, // Default 10 days. - DetectConflicts: true, + KeepBlocksInCache: false, + KeepBlockIndicesInCache: false, } } @@ -643,16 +645,29 @@ func (opt Options) WithIndexCacheSize(size int64) Options { return opt } -// WithDetectConflicts returns a new Options value with DetectConflicts set to the given value. +// WithKeepBlockIndicesInCache returns a new Option value with KeepBlockOffsetInCache set to the +// given value. +// +// When this option is set badger will store the block offsets in a cache along with the blocks. +// The size of the cache is determined by the MaxCacheSize option.If the MaxCacheSize is set to +// zero, then MaxCacheSize is set to 100 mb. When indices are stored in the cache, the read +// performance might be affected but the cache limits the amount of memory used by the indices. +// +// The default value of KeepBlockOffsetInCache is false. +func (opt Options) WithKeepBlockIndicesInCache(val bool) Options { + opt.KeepBlockIndicesInCache = val + return opt +} + +// WithKeepBlocksInCache returns a new Option value with KeepBlocksInCache set to the +// given value. // -// Detect conflicts options determines if the transactions would be checked for -// conflicts before committing them. When this option is set to false -// (detectConflicts=false) badger can process transactions at a higher rate. -// Setting this options to false might be useful when the user application -// deals with conflict detection and resolution. +// When this option is set badger will store the block in the cache. The size of the cache is +// determined by the MaxCacheSize option.If the MaxCacheSize is set to zero, +// then MaxCacheSize is set to 100 mb. // -// The default value of Detect conflicts is True. -func (opt Options) WithDetectConflicts(b bool) Options { - opt.DetectConflicts = b +// The default value of KeepBlocksInCache is false. +func (opt Options) WithKeepBlocksInCache(val bool) Options { + opt.KeepBlocksInCache = val return opt } diff --git a/txn.go b/txn.go index 21b0db54f..51d9328e0 100644 --- a/txn.go +++ b/txn.go @@ -32,8 +32,7 @@ import ( ) type oracle struct { - isManaged bool // Does not change value, so no locking required. - detectConflicts bool // Determines if the txns should be checked for conflicts. + isManaged bool // Does not change value, so no locking required. sync.Mutex // For nextTxnTs and commits. // writeChLock lock is for ensuring that transactions go to the write @@ -59,15 +58,13 @@ type oracle struct { } type committedTxn struct { - ts uint64 - // ConflictKeys Keeps track of the entries written at timestamp ts. - conflictKeys map[uint64]struct{} + ts uint64 + writes map[uint64]struct{} } func newOracle(opt Options) *oracle { orc := &oracle{ - isManaged: opt.managedTxns, - detectConflicts: opt.DetectConflicts, + isManaged: opt.managedTxns, // We're not initializing nextTxnTs and readOnlyTs. It would be done after replay in Open. // // WaterMarks must be 64-bit aligned for atomic package, hence we must use pointers here. @@ -151,7 +148,7 @@ func (o *oracle) hasConflict(txn *Txn) bool { } for _, ro := range txn.reads { - if _, has := committedTxn.conflictKeys[ro]; has { + if _, has := committedTxn.writes[ro]; has { return true } } @@ -185,12 +182,13 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { y.AssertTrue(ts >= o.lastCleanupTs) - if o.detectConflicts { - // We should ensure that txns are not added to o.committedTxns slice when - // conflict detection is disabled otherwise this slice would keep growing. + if !o.isManaged { + // We should ensure that txns are not added to o.committedTxns slice in + // managed mode. If the user doesn't set o.discardTs, the commitTxns + // slice would keep growing in managed mode. o.committedTxns = append(o.committedTxns, committedTxn{ - ts: ts, - conflictKeys: txn.conflictKeys, + ts: ts, + writes: txn.writes, }) } @@ -205,9 +203,10 @@ func (o *oracle) doneRead(txn *Txn) { } func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock - if !o.detectConflicts { - // When detectConflicts is set to false, we do not store any - // committedTxns and so there's nothing to clean up. + if o.isManaged { + // In managedMode, we do not store any committedTxns. It is expected + // that the system using badger in managedmode performs it's own + // conflict detection. return } // Same logic as discardAtOrBelow but unlocked @@ -255,8 +254,8 @@ type Txn struct { reads []uint64 // contains fingerprints of keys read. // contains fingerprints of keys written. This is used for conflict detection. - conflictKeys map[uint64]struct{} - readsLock sync.Mutex // guards the reads slice. See addReadKey. + writes map[uint64]struct{} // contains fingerprints of keys written. + readsLock sync.Mutex // guards the reads slice. See addReadKey. pendingWrites map[string]*Entry // cache stores any writes done by txn. duplicateWrites []*Entry // Used in managed mode to store duplicate entries. @@ -384,13 +383,8 @@ func (txn *Txn) modify(e *Entry) error { if err := txn.checkSize(e); err != nil { return err } - - // The txn.conflictKeys is used for conflict detection. If conflict detection - // is disabled, we don't need to store key hashes in this map. - if txn.db.opt.DetectConflicts { - fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. - txn.conflictKeys[fp] = struct{}{} - } + fp := z.MemHash(e.Key) // Avoid dealing with byte arrays. + txn.writes[fp] = struct{}{} // If a duplicate entry was inserted in managed mode, move it to the duplicate writes slice. // Add the entry to duplicateWrites only if both the entries have different versions. For // same versions, we will overwrite the existing entry. @@ -575,7 +569,7 @@ func (txn *Txn) commitAndSend() (func() error, error) { // down to here. So, keep this around for at least a couple of months. // var b strings.Builder // fmt.Fprintf(&b, "Read: %d. Commit: %d. reads: %v. writes: %v. Keys: ", - // txn.readTs, commitTs, txn.reads, txn.conflictKeys) + // txn.readTs, commitTs, txn.reads, txn.writes) for _, e := range txn.pendingWrites { processEntry(e) } @@ -651,9 +645,7 @@ func (txn *Txn) commitPrecheck() error { // If error is nil, the transaction is successfully committed. In case of a non-nil error, the LSM // tree won't be updated, so there's no need for any rollback. func (txn *Txn) Commit() error { - // txn.conflictKeys can be zero if conflict detection is turned off. So we - // should check txn.pendingWrites. - if len(txn.pendingWrites) == 0 { + if len(txn.writes) == 0 { return nil // Nothing to do. } // Precheck before discarding txn. @@ -704,7 +696,7 @@ func (txn *Txn) CommitWith(cb func(error)) { panic("Nil callback provided to CommitWith") } - if len(txn.pendingWrites) == 0 { + if len(txn.writes) == 0 { // Do not run these callbacks from here, because the CommitWith and the // callback might be acquiring the same locks. Instead run the callback // from another goroutine. @@ -771,9 +763,7 @@ func (db *DB) newTransaction(update, isManaged bool) *Txn { size: int64(len(txnKey) + 10), // Some buffer for the extra entry. } if update { - if db.opt.DetectConflicts { - txn.conflictKeys = make(map[uint64]struct{}) - } + txn.writes = make(map[uint64]struct{}) txn.pendingWrites = make(map[string]*Entry) } if !isManaged {