From a9ecea984af984a68789a08de9573bb1c3b900f7 Mon Sep 17 00:00:00 2001 From: Damien Tournoud Date: Fri, 10 Jul 2020 09:50:46 -0700 Subject: [PATCH] txn: Do not keep one too many conflict information This fixes an issue introduced in #1275. If the read watermark is, say `10`, it means that all the read transactions of `readTs <= 10` have completed. All the remaining in-flight transactions have `readTs >= 11`, which cannot possibly conflict with transactions of commitTs <= 11. --- txn.go | 11 +++++++++-- txn_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/txn.go b/txn.go index 3b16cb9a0..56b91f1fa 100644 --- a/txn.go +++ b/txn.go @@ -171,7 +171,6 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { var ts uint64 if !o.isManaged { o.doneRead(txn) - o.cleanupCommittedTransactions() // This is the general case, when user doesn't specify the read and commit ts. ts = o.nextTxnTs @@ -192,6 +191,10 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 { ts: ts, conflictKeys: txn.conflictKeys, }) + + if !o.isManaged { + o.cleanupCommittedTransactions() + } } return ts @@ -229,7 +232,11 @@ func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock tmp := o.committedTxns[:0] for _, txn := range o.committedTxns { - if txn.ts <= maxReadTs { + // If the read watermark is, say 10, it means that all the read transactions + // of readTs <= 10 have completed. Remain in-flight transactions of + // readTs >= 11, which cannot possibly conflict with transactions + // of commitTs <= 11. + if txn.ts <= maxReadTs+1 { continue } tmp = append(tmp, txn) diff --git a/txn_test.go b/txn_test.go index 391f4304a..8c8c12a5b 100644 --- a/txn_test.go +++ b/txn_test.go @@ -28,6 +28,7 @@ import ( "github.com/dgraph-io/badger/v2/options" "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/ristretto/z" "github.com/stretchr/testify/require" ) @@ -944,3 +945,45 @@ func TestConflict(t *testing.T) { runTest(t, testAndSetItr) }) } + +func TestOracleCleanup(t *testing.T) { + runBadgerTest(t, nil, func(t *testing.T, db *DB) { + require := require.New(t) + + doWrite := func(key []byte, expectedReadTs uint64) { + writeTxn := db.NewTransaction(true) + require.Equal(expectedReadTs, writeTxn.readTs) + defer writeTxn.Discard() + + err := writeTxn.Set(key, key) + require.NoError(err) + + err = writeTxn.Commit() + require.NoError(err) + } + + readTxn := db.NewTransaction(false) + + require.Equal(uint64(0), readTxn.readTs) + + key0 := []byte("key0") + doWrite([]byte("key0"), 0) + require.Equal(1, len(db.orc.committedTxns)) + txn := db.orc.committedTxns[0] + require.Equal(1, len(txn.conflictKeys)) + _, ok := txn.conflictKeys[z.MemHash(key0)] + require.True(ok) + + newReadTxn := db.NewTransaction(false) + require.Equal(uint64(1), newReadTxn.readTs) + newReadTxn.Discard() + + readTxn.Discard() + time.Sleep(1 * time.Second) + + key1 := []byte("key1") + doWrite(key1, 1) + + require.Equal(0, len(db.orc.committedTxns)) + }) +}