Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add Reader method to pin iterators at current engine state #66845

Merged
merged 1 commit into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (s spanSetReader) Closed() bool {
return s.r.Closed()
}

// ExportMVCCToSst is part of the engine.Reader interface.
// ExportMVCCToSst is part of the storage.Reader interface.
func (s spanSetReader) ExportMVCCToSst(
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
Expand Down Expand Up @@ -479,10 +479,16 @@ func (s spanSetReader) NewEngineIterator(opts storage.IterOptions) storage.Engin
}
}

// ConsistentIterators implements the storage.Reader interface.
func (s spanSetReader) ConsistentIterators() bool {
return s.r.ConsistentIterators()
}

// PinEngineStateForIterators implements the storage.Reader interface.
func (s spanSetReader) PinEngineStateForIterators() error {
return s.r.PinEngineStateForIterators()
}

// GetDBEngine recursively searches for the underlying rocksDB engine.
func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader {
switch v := reader.(type) {
Expand All @@ -495,7 +501,7 @@ func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader {
}
}

// getSpanReader is a getter to access the engine.Reader field of the
// getSpanReader is a getter to access the storage.Reader field of the
// spansetReader.
func getSpanReader(r ReadWriter, span roachpb.Span) storage.Reader {
if err := r.spanSetReader.spans.CheckAllowed(SpanReadOnly, span); err != nil {
Expand Down Expand Up @@ -700,7 +706,7 @@ func makeSpanSetReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Times
}
}

// NewReadWriterAt returns an engine.ReadWriter that asserts access of the
// NewReadWriterAt returns a storage.ReadWriter that asserts access of the
// underlying ReadWriter against the given SpanSet at a given timestamp.
// If zero timestamp is provided, accesses are considered non-MVCC.
func NewReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Timestamp) storage.ReadWriter {
Expand Down Expand Up @@ -734,7 +740,7 @@ func (s spanSetBatch) Repr() []byte {
return s.b.Repr()
}

// NewBatch returns an engine.Batch that asserts access of the underlying
// NewBatch returns a storage.Batch that asserts access of the underlying
// Batch against the given SpanSet. We only consider span boundaries, associated
// timestamps are not considered.
func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch {
Expand All @@ -746,7 +752,7 @@ func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch {
}
}

// NewBatchAt returns an engine.Batch that asserts access of the underlying
// NewBatchAt returns an storage.Batch that asserts access of the underlying
// Batch against the given SpanSet at the given timestamp.
// If the zero timestamp is used, all accesses are considered non-MVCC.
func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch {
Expand Down
27 changes: 22 additions & 5 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,10 @@ const (
// different iterators created by NewMVCCIterator, NewEngineIterator:
// - pebbleSnapshot, because it uses an engine snapshot.
// - pebbleReadOnly, pebbleBatch: when the IterOptions do not specify a
// timestamp hint. Note that currently the engine state visible here is
// not as of the time of the Reader creation. It is the time when the
// first iterator is created.
// timestamp hint (see IterOptions). Note that currently the engine state
// visible here is not as of the time of the Reader creation. It is the time
// when the first iterator is created, or earlier if
// PinEngineStateForIterators is called.
// The ConsistentIterators method returns true when this consistency is
// guaranteed by the Reader.
// TODO(sumeer): this partial consistency can be a source of bugs if future
Expand Down Expand Up @@ -443,9 +444,25 @@ type Reader interface {
// after this function returns.
NewEngineIterator(opts IterOptions) EngineIterator
// ConsistentIterators returns true if the Reader implementation guarantees
// that the different iterators constructed by this Reader will see the
// same underlying Engine state.
// that the different iterators constructed by this Reader will see the same
// underlying Engine state. NB: this only applies to iterators without
// timestamp hints (see IterOptions), i.e., even if this returns true, those
// iterators can be "inconsistent" in terms of seeing a different engine
// state. The only exception to this is a Reader created using NewSnapshot.
ConsistentIterators() bool

// PinEngineStateForIterators ensures that the state seen by iterators
// without timestamp hints (see IterOptions) is pinned and will not see
// future mutations. It can be called multiple times on a Reader in which
// case the state seen will be either:
// - As of the first call.
// - For a Reader returned by Engine.NewSnapshot, the pinned state is as of
// the time the snapshot was taken.
// So the semantics that are true for all Readers is that the pinned state
// is somewhere in the time interval between the creation of the Reader and
// the first call to PinEngineStateForIterators.
// REQUIRES: ConsistentIterators returns true.
PinEngineStateForIterators() error
}

// PrecedingIntentState is information needed when writing or clearing an
Expand Down
43 changes: 27 additions & 16 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,12 @@ func (p *Pebble) ConsistentIterators() bool {
return false
}

// PinEngineStateForIterators implements the Engine interface.
func (p *Pebble) PinEngineStateForIterators() error {
return errors.AssertionFailedf(
"PinEngineStateForIterators must not be called when ConsistentIterators returns false")
}

// ApplyBatchRepr implements the Engine interface.
func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error {
// batch.SetRepr takes ownership of the underlying slice, so make a copy.
Expand Down Expand Up @@ -1237,22 +1243,13 @@ type pebbleReadOnly struct {
// need separate iterators for EngineKey and MVCCKey iteration since
// iterators that make separated locks/intents look as interleaved need to
// use both simultaneously.
// When the first iterator is initialized, the underlying *pebble.Iterator
// is stashed in iter, so that subsequent iterator initialization can use
// Iterator.Clone to use the same underlying engine state. This relies on
// the fact that all pebbleIterators created here are marked as reusable,
// which causes pebbleIterator.Close to not close iter. iter will be closed
// when pebbleReadOnly.Close is called.
//
// TODO(sumeer): The lazy iterator creation is insufficient to address
// issues like https://github.com/cockroachdb/cockroach/issues/55461.
// We could create the pebble.Iterator eagerly, since a caller using
// pebbleReadOnly is eventually going to create one anyway. But we
// already have different behaviors in different Reader implementations
// (see Reader.ConsistentIterators) that callers don't pay attention
// to, and adding another such difference could be a source of bugs.
// See https://github.com/cockroachdb/cockroach/pull/58515#pullrequestreview-563993424
// for more discussion.
// When the first iterator is initialized, or when
// PinEngineStateForIterators is called (whichever happens first), the
// underlying *pebble.Iterator is stashed in iter, so that subsequent
// iterator initialization can use Iterator.Clone to use the same underlying
// engine state. This relies on the fact that all pebbleIterators created
// here are marked as reusable, which causes pebbleIterator.Close to not
// close iter. iter will be closed when pebbleReadOnly.Close is called.
prefixIter pebbleIterator
normalIter pebbleIterator
prefixEngineIter pebbleIterator
Expand Down Expand Up @@ -1474,6 +1471,14 @@ func (p *pebbleReadOnly) ConsistentIterators() bool {
return true
}

// PinEngineStateForIterators implements the Engine interface.
func (p *pebbleReadOnly) PinEngineStateForIterators() error {
if p.iter == nil {
p.iter = p.parent.db.NewIter(nil)
}
return nil
}

// Writer methods are not implemented for pebbleReadOnly. Ideally, the code
// could be refactored so that a Reader could be supplied to evaluateBatch

Expand Down Expand Up @@ -1673,6 +1678,12 @@ func (p pebbleSnapshot) ConsistentIterators() bool {
return true
}

// PinEngineStateForIterators implements the Reader interface.
func (p *pebbleSnapshot) PinEngineStateForIterators() error {
// Snapshot already pins state, so nothing to do.
return nil
}

// pebbleGetProto uses Reader.MVCCGet, so it not as efficient as a function
// that can unmarshal without copying bytes. But we don't care about
// efficiency, since this is used to implement Reader.MVCCGetProto, which is
Expand Down
25 changes: 19 additions & 6 deletions pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ type pebbleBatch struct {
// need separate iterators for EngineKey and MVCCKey iteration since
// iterators that make separated locks/intents look as interleaved need to
// use both simultaneously.
// When the first iterator is initialized, the underlying *pebble.Iterator
// is stashed in iter, so that subsequent iterator initialization can use
// Iterator.Clone to use the same underlying engine state. This relies on
// the fact that all pebbleIterators created here are marked as reusable,
// which causes pebbleIterator.Close to not close iter. iter will be closed
// when pebbleBatch.Close is called.
// When the first iterator is initialized, or when
// PinEngineStateForIterators is called (whichever happens first), the
// underlying *pebble.Iterator is stashed in iter, so that subsequent
// iterator initialization can use Iterator.Clone to use the same underlying
// engine state. This relies on the fact that all pebbleIterators created
// here are marked as reusable, which causes pebbleIterator.Close to not
// close iter. iter will be closed when pebbleBatch.Close is called.
prefixIter pebbleIterator
normalIter pebbleIterator
prefixEngineIter pebbleIterator
Expand Down Expand Up @@ -299,6 +300,18 @@ func (p *pebbleBatch) ConsistentIterators() bool {
return true
}

// PinEngineStateForIterators implements the Batch interface.
func (p *pebbleBatch) PinEngineStateForIterators() error {
if p.iter == nil {
if p.batch.Indexed() {
p.iter = p.batch.NewIter(nil)
} else {
p.iter = p.db.NewIter(nil)
}
}
return nil
}

// NewMVCCIterator implements the Batch interface.
func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error {
var batch pebble.Batch
Expand Down
46 changes: 34 additions & 12 deletions pkg/storage/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,17 +457,24 @@ func TestPebbleIterConsistency(t *testing.T) {

roEngine := eng.NewReadOnly()
batch := eng.NewBatch()
roEngine2 := eng.NewReadOnly()
batch2 := eng.NewBatch()

require.False(t, eng.ConsistentIterators())
require.True(t, roEngine.ConsistentIterators())
require.True(t, batch.ConsistentIterators())
require.True(t, roEngine2.ConsistentIterators())
require.True(t, batch2.ConsistentIterators())

// Since an iterator is created on pebbleReadOnly, pebbleBatch before
// writing a newer version of "a", the newer version will not be visible to
// iterators that are created later.
roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close()
batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close()
eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close()
// Pin the state for iterators.
require.Nil(t, roEngine2.PinEngineStateForIterators())
require.Nil(t, batch2.PinEngineStateForIterators())

// Write a newer version of "a"
require.NoError(t, eng.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2")))
Expand Down Expand Up @@ -506,26 +513,41 @@ func TestPebbleIterConsistency(t *testing.T) {
checkMVCCIter(roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))
checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))
checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))
checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true}))

checkEngineIter(roEngine.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(roEngine.NewEngineIterator(IterOptions{Prefix: true}))
checkEngineIter(batch.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(batch.NewEngineIterator(IterOptions{Prefix: true}))
checkEngineIter(roEngine2.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(roEngine2.NewEngineIterator(IterOptions{Prefix: true}))
checkEngineIter(batch2.NewEngineIterator(IterOptions{UpperBound: []byte("b")}))
checkEngineIter(batch2.NewEngineIterator(IterOptions{Prefix: true}))

// The eng iterator will see both values.
iter := eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})
defer iter.Close()
iter.SeekGE(MVCCKey{Key: []byte("a")})
count := 0
for ; ; iter.Next() {
valid, err := iter.Valid()
require.NoError(t, err)
if !valid {
break
checkIterSeesBothValues := func(iter MVCCIterator) {
iter.SeekGE(MVCCKey{Key: []byte("a")})
count := 0
for ; ; iter.Next() {
valid, err := iter.Valid()
require.NoError(t, err)
if !valid {
break
}
count++
}
count++
require.Equal(t, 2, count)
iter.Close()
}
require.Equal(t, 2, count)
// The eng iterator will see both values.
checkIterSeesBothValues(eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
// The indexed batches will see 2 values since the second one is written to the batch.
require.NoError(t, batch.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2")))
require.NoError(t, batch2.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2")))
checkIterSeesBothValues(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
checkIterSeesBothValues(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}))
}

func BenchmarkMVCCKeyCompare(b *testing.B) {
Expand Down