Skip to content

Commit

Permalink
storage: handle multiple iterators in pebbleReadOnly and pebbleBatch
Browse files Browse the repository at this point in the history
Previously, `pebbleReadOnly` and `pebbleBatch` only supported a single
concurrent iterator of a given type. Attempts to create an additional
iterator would panic with "iterator already in use".

This is particularly problematic because a subsequent change will make
more iterators reusable (specifically TBI iterators), such that e.g.
`MVCCIncrementalIterator` will be creating two concurrent reusable
iterators and trigger this panic.

This patch supports multiple iterators by creating new cloned iterators
if the existing iterator is in use. This preserves the pinned engine
state for the cloned iterators too.

Release note: None
  • Loading branch information
erikgrinaker committed Apr 22, 2022
1 parent 4ffbca7 commit 916b26d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1934,7 +1934,7 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
iter = &p.prefixIter
}
if iter.inuse {
panic("iterator already in use")
return newPebbleIterator(p.parent.db, p.iter, opts, p.durability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)
Expand Down Expand Up @@ -1970,7 +1970,7 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
iter = &p.prefixEngineIter
}
if iter.inuse {
panic("iterator already in use")
return newPebbleIterator(p.parent.db, p.iter, opts, p.durability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)
Expand Down
24 changes: 12 additions & 12 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,20 +228,20 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M
if opts.Prefix {
iter = &p.prefixIter
}
handle := pebble.Reader(p.batch)
if !p.batch.Indexed() {
handle = p.db
}
if iter.inuse {
panic("iterator already in use")
return newPebbleIterator(handle, p.iter, opts, StandardDurability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
} else {
if p.batch.Indexed() {
iter.init(p.batch, p.iter, p.iterUnused, opts, StandardDurability)
} else {
iter.init(p.db, p.iter, p.iterUnused, opts, StandardDurability)
}
iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand Down Expand Up @@ -271,20 +271,20 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator {
if opts.Prefix {
iter = &p.prefixEngineIter
}
handle := pebble.Reader(p.batch)
if !p.batch.Indexed() {
handle = p.db
}
if iter.inuse {
panic("iterator already in use")
return newPebbleIterator(handle, p.iter, opts, StandardDurability)
}
// Ensures no timestamp hints etc.
checkOptionsForIterReuse(opts)

if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
} else {
if p.batch.Indexed() {
iter.init(p.batch, p.iter, p.iterUnused, opts, StandardDurability)
} else {
iter.init(p.db, p.iter, p.iterUnused, opts, StandardDurability)
}
iter.init(handle, p.iter, p.iterUnused, opts, StandardDurability)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
Expand Down
76 changes: 76 additions & 0 deletions pkg/storage/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -1118,3 +1119,78 @@ func TestPebbleFlushCallbackAndDurabilityRequirement(t *testing.T) {
defer roGuaranteed2.Close()
require.Equal(t, "a1", string(checkGetAndIter(roGuaranteed2)))
}

// TestPebbleReaderMultipleIterators tests that all Pebble readers support
// multiple concurrent iterators of the same type.
func TestPebbleReaderMultipleIterators(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := NewDefaultInMemForTesting()
defer eng.Close()

a1 := MVCCKey{Key: roachpb.Key("a"), Timestamp: hlc.Timestamp{WallTime: 1}}
b1 := MVCCKey{Key: roachpb.Key("b"), Timestamp: hlc.Timestamp{WallTime: 1}}
c1 := MVCCKey{Key: roachpb.Key("c"), Timestamp: hlc.Timestamp{WallTime: 1}}

require.NoError(t, eng.PutMVCC(a1, []byte{1}))
require.NoError(t, eng.PutMVCC(b1, []byte{2}))
require.NoError(t, eng.PutMVCC(c1, []byte{3}))

readOnly := eng.NewReadOnly(StandardDurability)
defer readOnly.Close()
require.NoError(t, readOnly.PinEngineStateForIterators())

snapshot := eng.NewSnapshot()
defer snapshot.Close()
require.NoError(t, snapshot.PinEngineStateForIterators())

batch := eng.NewBatch()
defer batch.Close()
require.NoError(t, batch.PinEngineStateForIterators())

// These writes should not be visible to any of the pinned iterators.
require.NoError(t, eng.PutMVCC(a1, []byte{9}))
require.NoError(t, eng.PutMVCC(b1, []byte{9}))
require.NoError(t, eng.PutMVCC(c1, []byte{9}))

testcases := map[string]Reader{
"Engine": eng,
"ReadOnly": readOnly,
"Snapshot": snapshot,
"Batch": batch,
}
for name, r := range testcases {
t.Run(name, func(t *testing.T) {
// Make sure we can create two iterators of the same type.
i1 := r.NewMVCCIterator(MVCCKeyIterKind, IterOptions{LowerBound: a1.Key, UpperBound: keys.MaxKey})
i2 := r.NewMVCCIterator(MVCCKeyIterKind, IterOptions{LowerBound: b1.Key, UpperBound: keys.MaxKey})

// Make sure the iterators are independent.
i1.SeekGE(a1)
i2.SeekGE(a1)
require.Equal(t, a1, i1.UnsafeKey())
require.Equal(t, b1, i2.UnsafeKey()) // b1 because of LowerBound

// Check iterator consistency.
if r.ConsistentIterators() {
require.Equal(t, []byte{1}, i1.UnsafeValue())
require.Equal(t, []byte{2}, i2.UnsafeValue())
} else {
require.Equal(t, []byte{9}, i1.UnsafeValue())
require.Equal(t, []byte{9}, i2.UnsafeValue())
}

// Closing one iterator shouldn't affect the other.
i1.Close()
i2.Next()
require.Equal(t, c1, i2.UnsafeKey())
i2.Close()

// Quick check for engine iterators too.
e1 := r.NewEngineIterator(IterOptions{UpperBound: keys.MaxKey})
defer e1.Close()
e2 := r.NewEngineIterator(IterOptions{UpperBound: keys.MaxKey})
defer e2.Close()
})
}
}

0 comments on commit 916b26d

Please sign in to comment.