Skip to content

Commit

Permalink
Support multiple iterators in read-write transactions.
Browse files Browse the repository at this point in the history
This adds support for multiple iterators during a read-write transaction. But iterators created in a read-write
transaction will only be able to see writes that were performed before the iterator was created. Any writes that occur
after the iterator is created will be invisible to the iterator.

This references an issue I opened previously: #981

Added a test case for using multiple concurrent iterators when inside a
read-write transaction. Added a mutex around the reads array on the
transaction to make sure that it is thread safe when it is being
modified by potentially concurrent iterators.
  • Loading branch information
elliotcourant committed Apr 24, 2020
1 parent 4b539b9 commit 4ec5256
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
40 changes: 30 additions & 10 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,21 +815,41 @@ func TestIterateParallel(t *testing.T) {
// Check that a RW txn can't run multiple iterators.
txn := db.NewTransaction(true)
itr := txn.NewIterator(DefaultIteratorOptions)
require.Panics(t, func() {
txn.NewIterator(DefaultIteratorOptions)
require.NotPanics(t, func() {
// Now that multiple iterators are supported in read-write transactions, make sure this does not panic
// anymore. Then just close the iterator.
txn.NewIterator(DefaultIteratorOptions).Close()
})
// The transaction should still panic since there is still one pending iterator that is open.
require.Panics(t, txn.Discard)
itr.Close()
txn.Discard()

// Run multiple iterators for a RO txn.
txn = db.NewTransaction(false)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
// (Regression) Make sure that creating multiple concurrent iterators within a read only transaction continues
// to work.
t.Run("multiple read-only iterators", func(t *testing.T) {
// Run multiple iterators for a RO txn.
txn = db.NewTransaction(false)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
})

// Make sure that when we create multiple concurrent iterators within a read-write transaction that it actually
// iterates successfully.
t.Run("multiple read-write iterators", func(t *testing.T) {
// Run multiple iterators for a RO txn.
txn = db.NewTransaction(true)
defer txn.Discard()
wg.Add(3)
go iterate(txn, &wg)
go iterate(txn, &wg)
go iterate(txn, &wg)
wg.Wait()
})
})
}

Expand Down
12 changes: 6 additions & 6 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,16 +442,16 @@ type Iterator struct {
//
// Multiple Iterators:
// For a read-only txn, multiple iterators can be running simultaneously. However, for a read-write
// txn, only one can be running at one time to avoid race conditions, because Txn is thread-unsafe.
// txn, iterators have the nuance of being a snapshot of the writes for the transaction at the time
// iterator was created. If writes are performed after an iterator is created, then that iterator
// will not be able to see those writes. Only writes performed before an iterator was created can be
// viewed.
func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
if txn.discarded {
panic("Transaction has already been discarded")
}
// Do not change the order of the next if. We must track the number of running iterators.
if atomic.AddInt32(&txn.numIterators, 1) > 1 && txn.update {
atomic.AddInt32(&txn.numIterators, -1)
panic("Only one iterator can be active at one time, for a RW txn.")
}

atomic.AddInt32(&txn.numIterators, 1)

// TODO: If Prefix is set, only pick those memtables which have keys with
// the prefix.
Expand Down
13 changes: 10 additions & 3 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ type Txn struct {
readTs uint64
commitTs uint64

update bool // update is used to conditionally keep track of reads.
reads []uint64 // contains fingerprints of keys read.
writes []uint64 // contains fingerprints of keys written.
update bool // update is used to conditionally keep track of reads.
readsLock sync.Mutex // make sure only one thread can update the reads list at a time. See addReadKey
reads []uint64 // contains fingerprints of keys read.
writes []uint64 // contains fingerprints of keys written.

pendingWrites map[string]*Entry // cache stores any writes done by txn.

Expand Down Expand Up @@ -431,6 +432,12 @@ func (txn *Txn) Get(key []byte) (item *Item, rerr error) {
func (txn *Txn) addReadKey(key []byte) {
if txn.update {
fp := z.MemHash(key)

// Because of the possibility of multiple iterators it is now possible for multiple threads within a read-write
// transaction to read keys at the same time. The reads slice is not currently thread-safe and needs to be
// locked whenever we mark a key as read.
txn.readsLock.Lock()
defer txn.readsLock.Unlock()
txn.reads = append(txn.reads, fp)
}
}
Expand Down

0 comments on commit 4ec5256

Please sign in to comment.