Skip to content

Commit

Permalink
storage: add Reader method to pin iterators at current engine state
Browse files Browse the repository at this point in the history
This is relevant for read evaluation cases where we want to release
latches before evaluation. The new Reader.PinEngineStateForIterators
method would be called before releasing latches.

This pinning does not apply to iterators with timestamp hints,
similar to how ConsistentIterators does not apply to them. So
this does not help ExportRequest evaluation for incremental backups.
To address this we would want a guarantee from the caller that the
timestamp hints will not change for the lifetime of the Reader that
is making a promise of ConsistentIterators.

Informs cockroachdb#55461,cockroachdb#66485

Release note: None
  • Loading branch information
sumeerbhola committed Jun 24, 2021
1 parent a1f969c commit e283da4
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 25 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ func (s spanSetReader) ConsistentIterators() bool {
return s.r.ConsistentIterators()
}

func (s spanSetReader) PinEngineStateForIterators() error {
return s.r.PinEngineStateForIterators()
}

// GetDBEngine recursively searches for the underlying rocksDB engine.
func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader {
switch v := reader.(type) {
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,20 @@ type Reader interface {
NewEngineIterator(opts IterOptions) EngineIterator
// ConsistentIterators returns true if the Reader implementation guarantees
// that the different iterators constructed by this Reader will see the
// same underlying Engine state.
// same underlying Engine state. NB: this only applies to iterators without
// timestamp hints.
ConsistentIterators() bool
// PinEngineStateForIterators ensures that the state seen by iterators
// without timestamp hints is pinned and will not see future mutations. It
// can be called multiple times on a Reader in which case the state seen
// will be as of the first call. The exception is the implementation that
// uses a Reader returned by Engine.NewSnapshot -- the pinned state is as of
// the time the snapshot was taken. So the semantics that are true for all
// Readers is that the pinned state is somewhere in the time interval
// between the creation of the Reader and the first call to
// PinEngineStateForIterators.
// REQUIRES: ConsistentIterators returns true.
PinEngineStateForIterators() error
}

// PrecedingIntentState is information needed when writing or clearing an
Expand Down
31 changes: 20 additions & 11 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,11 @@ func (p *Pebble) ConsistentIterators() bool {
return false
}

func (p *Pebble) PinEngineStateForIterators() error {
return errors.AssertionFailedf(
"PinEngineStateForIterators must not be called when ConsistentIterators returns false")
}

// ApplyBatchRepr implements the Engine interface.
func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error {
// batch.SetRepr takes ownership of the underlying slice, so make a copy.
Expand Down Expand Up @@ -1242,17 +1247,9 @@ type pebbleReadOnly struct {
// Iterator.Clone to use the same underlying engine state. This relies on
// the fact that all pebbleIterators created here are marked as reusable,
// which causes pebbleIterator.Close to not close iter. iter will be closed
// when pebbleReadOnly.Close is called.
//
// TODO(sumeer): The lazy iterator creation is insufficient to address
// issues like https://github.com/cockroachdb/cockroach/issues/55461.
// We could create the pebble.Iterator eagerly, since a caller using
// pebbleReadOnly is eventually going to create one anyway. But we
// already have different behaviors in different Reader implementations
// (see Reader.ConsistentIterators) that callers don't pay attention
// to, and adding another such difference could be a source of bugs.
// See https://github.com/cockroachdb/cockroach/pull/58515#pullrequestreview-563993424
// for more discussion.
// when pebbleReadOnly.Close is called. The caller can change the above
// lazy creation of the *pebble.Iterator that will be cloned by calling
// PinEngineStateForIterators.
prefixIter pebbleIterator
normalIter pebbleIterator
prefixEngineIter pebbleIterator
Expand Down Expand Up @@ -1474,6 +1471,13 @@ func (p *pebbleReadOnly) ConsistentIterators() bool {
return true
}

func (p *pebbleReadOnly) PinEngineStateForIterators() error {
if p.iter == nil {
p.iter = p.parent.db.NewIter(nil)
}
return nil
}

// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
// could be refactored so that a Reader could be supplied to evaluateBatch

Expand Down Expand Up @@ -1673,6 +1677,11 @@ func (p pebbleSnapshot) ConsistentIterators() bool {
return true
}

func (p *pebbleSnapshot) PinEngineStateForIterators() error {
// Snapshot already pins state, so nothing to do.
return nil
}

// pebbleGetProto uses Reader.MVCCGet, so it not as efficient as a function
// that can unmarshal without copying bytes. But we don't care about
// efficiency, since this is used to implement Reader.MVCCGetProto, which is
Expand Down
15 changes: 14 additions & 1 deletion pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ type pebbleBatch struct {
// Iterator.Clone to use the same underlying engine state. This relies on
// the fact that all pebbleIterators created here are marked as reusable,
// which causes pebbleIterator.Close to not close iter. iter will be closed
// when pebbleBatch.Close is called.
// when pebbleBatch.Close is called. The caller can change the above lazy
// creation of the *pebble.Iterator that will be cloned by calling
// PinEngineStateForIterators.
prefixIter pebbleIterator
normalIter pebbleIterator
prefixEngineIter pebbleIterator
Expand Down Expand Up @@ -299,6 +301,17 @@ func (p *pebbleBatch) ConsistentIterators() bool {
return true
}

func (p *pebbleBatch) PinEngineStateForIterators() error {
if p.iter == nil {
if p.batch.Indexed() {
p.iter = p.batch.NewIter(nil)
} else {
p.iter = p.db.NewIter(nil)
}
}
return nil
}

// NewMVCCIterator implements the Batch interface.
func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error {
var batch pebble.Batch
Expand Down
46 changes: 34 additions & 12 deletions pkg/storage/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,17 +457,24 @@ func TestPebbleIterConsistency(t *testing.T) {

roEngine := eng.NewReadOnly()
batch := eng.NewBatch()
roEngine2 := eng.NewReadOnly()
batch2 := eng.NewBatch()

require.False(t, eng.ConsistentIterators())
require.True(t, roEngine.ConsistentIterators())
require.True(t, batch.ConsistentIterators())
require.True(t, roEngine2.ConsistentIterators())
require.True(t, batch2.ConsistentIterators())

// Since an iterator is created on pebbleReadOnly, pebbleBatch before
// writing a newer version of "a", the newer version will not be visible to
// iterators that are created later.
roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close()
batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close()
eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close()
// Pin the state for iterators.
require.Nil(t, roEngine2.PinEngineStateForIterators())
require.Nil(t, batch2.PinEngineStateForIterators())

// Write a newer version of "a"
require.NoError(t, eng.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2")))
Expand Down Expand Up @@ -506,26 +513,41 @@ func TestPebbleIterConsistency(t *testing.T) {
checkMVCCIter(roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))
checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))
checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))
checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))

checkEngineIter(roEngine.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(roEngine.NewEngineIterator(IterOptions{Prefix: true}))
checkEngineIter(batch.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(batch.NewEngineIterator(IterOptions{Prefix: true}))
checkEngineIter(roEngine2.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(roEngine2.NewEngineIterator(IterOptions{Prefix: true}))
checkEngineIter(batch2.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(batch2.NewEngineIterator(IterOptions{Prefix: true}))

// The eng iterator will see both values.
iter := eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})
defer iter.Close()
iter.SeekGE(MVCCKey{Key: []byte("a")})
count := 0
for ; ; iter.Next() {
valid, err := iter.Valid()
require.NoError(t, err)
if !valid {
break
checkIterSeesBothValues := func(iter MVCCIterator) {
iter.SeekGE(MVCCKey{Key: []byte("a")})
count := 0
for ; ; iter.Next() {
valid, err := iter.Valid()
require.NoError(t, err)
if !valid {
break
}
count++
}
count++
require.Equal(t, 2, count)
iter.Close()
}
require.Equal(t, 2, count)
// The eng iterator will see both values.
checkIterSeesBothValues(eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
// The indexed batches will see 2 values since the second one is written to the batch.
require.NoError(t, batch.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2")))
require.NoError(t, batch2.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2")))
checkIterSeesBothValues(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkIterSeesBothValues(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
}

func BenchmarkMVCCKeyCompare(b *testing.B) {
Expand Down

0 comments on commit e283da4

Please sign in to comment.