Skip to content

Commit

Permalink
txn: Do not keep one too many conflict information
Browse files Browse the repository at this point in the history
This fixes an issue introduced in dgraph-io#1275.

If the read watermark is, say `10`, it means that all the read transactions
of `readTs <= 10` have completed. All the remaining in-flight transactions
have `readTs >= 11`, which cannot possibly conflict with transactions
of commitTs <= 11.
  • Loading branch information
damz committed Jul 10, 2020
1 parent a098630 commit a9ecea9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 2 deletions.
11 changes: 9 additions & 2 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 {
var ts uint64
if !o.isManaged {
o.doneRead(txn)
o.cleanupCommittedTransactions()

// This is the general case, when user doesn't specify the read and commit ts.
ts = o.nextTxnTs
Expand All @@ -192,6 +191,10 @@ func (o *oracle) newCommitTs(txn *Txn) uint64 {
ts: ts,
conflictKeys: txn.conflictKeys,
})

if !o.isManaged {
o.cleanupCommittedTransactions()
}
}

return ts
Expand Down Expand Up @@ -229,7 +232,11 @@ func (o *oracle) cleanupCommittedTransactions() { // Must be called under o.Lock

tmp := o.committedTxns[:0]
for _, txn := range o.committedTxns {
if txn.ts <= maxReadTs {
// If the read watermark is, say 10, it means that all the read transactions
// of readTs <= 10 have completed. Remain in-flight transactions of
// readTs >= 11, which cannot possibly conflict with transactions
// of commitTs <= 11.
if txn.ts <= maxReadTs+1 {
continue
}
tmp = append(tmp, txn)
Expand Down
43 changes: 43 additions & 0 deletions txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/dgraph-io/badger/v2/options"
"github.com/dgraph-io/badger/v2/y"
"github.com/dgraph-io/ristretto/z"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -944,3 +945,45 @@ func TestConflict(t *testing.T) {
runTest(t, testAndSetItr)
})
}

func TestOracleCleanup(t *testing.T) {
runBadgerTest(t, nil, func(t *testing.T, db *DB) {
require := require.New(t)

doWrite := func(key []byte, expectedReadTs uint64) {
writeTxn := db.NewTransaction(true)
require.Equal(expectedReadTs, writeTxn.readTs)
defer writeTxn.Discard()

err := writeTxn.Set(key, key)
require.NoError(err)

err = writeTxn.Commit()
require.NoError(err)
}

readTxn := db.NewTransaction(false)

require.Equal(uint64(0), readTxn.readTs)

key0 := []byte("key0")
doWrite([]byte("key0"), 0)
require.Equal(1, len(db.orc.committedTxns))
txn := db.orc.committedTxns[0]
require.Equal(1, len(txn.conflictKeys))
_, ok := txn.conflictKeys[z.MemHash(key0)]
require.True(ok)

newReadTxn := db.NewTransaction(false)
require.Equal(uint64(1), newReadTxn.readTs)
newReadTxn.Discard()

readTxn.Discard()
time.Sleep(1 * time.Second)

key1 := []byte("key1")
doWrite(key1, 1)

require.Equal(0, len(db.orc.committedTxns))
})
}

0 comments on commit a9ecea9

Please sign in to comment.