From e228cf5642047d4114aea97e61d10a30c123e119 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Thu, 24 Jun 2021 10:05:49 -0400 Subject: [PATCH] storage: add Reader method to pin iterators at current engine state This is relevant for read evaluation cases where we want to release latches before evaluation. The new Reader.PinEngineStateForIterators method would be called before releasing latches. This pinning does not apply to iterators with timestamp hints, similar to how ConsistentIterators does not apply to them. So this does not help ExportRequest evaluation for incremental backups. To address this we would want a guarantee from the caller that the timestamp hints will not change for the lifetime of the Reader that is making a promise of ConsistentIterators. Informs #55461,#66485 Release note: None --- pkg/kv/kvserver/spanset/batch.go | 16 +++++++---- pkg/storage/engine.go | 27 +++++++++++++++---- pkg/storage/pebble.go | 43 ++++++++++++++++++----------- pkg/storage/pebble_batch.go | 25 ++++++++++++----- pkg/storage/pebble_test.go | 46 +++++++++++++++++++++++--------- 5 files changed, 113 insertions(+), 44 deletions(-) diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index a09bc41d2e34..2ae3a19634d1 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -402,7 +402,7 @@ func (s spanSetReader) Closed() bool { return s.r.Closed() } -// ExportMVCCToSst is part of the engine.Reader interface. +// ExportMVCCToSst is part of the storage.Reader interface. func (s spanSetReader) ExportMVCCToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, @@ -479,10 +479,16 @@ func (s spanSetReader) NewEngineIterator(opts storage.IterOptions) storage.Engin } } +// ConsistentIterators implements the storage.Reader interface. func (s spanSetReader) ConsistentIterators() bool { return s.r.ConsistentIterators() } +// PinEngineStateForIterators implements the storage.Reader interface. +func (s spanSetReader) PinEngineStateForIterators() error { + return s.r.PinEngineStateForIterators() +} + // GetDBEngine recursively searches for the underlying rocksDB engine. func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { switch v := reader.(type) { @@ -495,7 +501,7 @@ func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { } } -// getSpanReader is a getter to access the engine.Reader field of the +// getSpanReader is a getter to access the storage.Reader field of the // spansetReader. func getSpanReader(r ReadWriter, span roachpb.Span) storage.Reader { if err := r.spanSetReader.spans.CheckAllowed(SpanReadOnly, span); err != nil { @@ -700,7 +706,7 @@ func makeSpanSetReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Times } } -// NewReadWriterAt returns an engine.ReadWriter that asserts access of the +// NewReadWriterAt returns a storage.ReadWriter that asserts access of the // underlying ReadWriter against the given SpanSet at a given timestamp. // If zero timestamp is provided, accesses are considered non-MVCC. func NewReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Timestamp) storage.ReadWriter { @@ -734,7 +740,7 @@ func (s spanSetBatch) Repr() []byte { return s.b.Repr() } -// NewBatch returns an engine.Batch that asserts access of the underlying +// NewBatch returns a storage.Batch that asserts access of the underlying // Batch against the given SpanSet. We only consider span boundaries, associated // timestamps are not considered. func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch { @@ -746,7 +752,7 @@ func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch { } } -// NewBatchAt returns an engine.Batch that asserts access of the underlying +// NewBatchAt returns an storage.Batch that asserts access of the underlying // Batch against the given SpanSet at the given timestamp. // If the zero timestamp is used, all accesses are considered non-MVCC. func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch { diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index e112e7fc749f..4ebc94c90103 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -363,9 +363,10 @@ const ( // different iterators created by NewMVCCIterator, NewEngineIterator: // - pebbleSnapshot, because it uses an engine snapshot. // - pebbleReadOnly, pebbleBatch: when the IterOptions do not specify a -// timestamp hint. Note that currently the engine state visible here is -// not as of the time of the Reader creation. It is the time when the -// first iterator is created. +// timestamp hint (see IterOptions). Note that currently the engine state +// visible here is not as of the time of the Reader creation. It is the time +// when the first iterator is created, or earlier if +// PinEngineStateForIterators is called. // The ConsistentIterators method returns true when this consistency is // guaranteed by the Reader. // TODO(sumeer): this partial consistency can be a source of bugs if future @@ -443,9 +444,25 @@ type Reader interface { // after this function returns. NewEngineIterator(opts IterOptions) EngineIterator // ConsistentIterators returns true if the Reader implementation guarantees - // that the different iterators constructed by this Reader will see the - // same underlying Engine state. + // that the different iterators constructed by this Reader will see the same + // underlying Engine state. NB: this only applies to iterators without + // timestamp hints (see IterOptions), i.e., even if this returns true, those + // iterators can be "inconsistent" in terms of seeing a different engine + // state. The only exception to this is a Reader created using NewSnapshot. ConsistentIterators() bool + + // PinEngineStateForIterators ensures that the state seen by iterators + // without timestamp hints (see IterOptions) is pinned and will not see + // future mutations. It can be called multiple times on a Reader in which + // case the state seen will be either: + // - As of the first call. + // - For a Reader returned by Engine.NewSnapshot, the pinned state is as of + // the time the snapshot was taken. + // So the semantics that are true for all Readers is that the pinned state + // is somewhere in the time interval between the creation of the Reader and + // the first call to PinEngineStateForIterators. + // REQUIRES: ConsistentIterators returns true. + PinEngineStateForIterators() error } // PrecedingIntentState is information needed when writing or clearing an diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 60658870ca0a..c0e024b0aadf 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -779,6 +779,12 @@ func (p *Pebble) ConsistentIterators() bool { return false } +// PinEngineStateForIterators implements the Engine interface. +func (p *Pebble) PinEngineStateForIterators() error { + return errors.AssertionFailedf( + "PinEngineStateForIterators must not be called when ConsistentIterators returns false") +} + // ApplyBatchRepr implements the Engine interface. func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error { // batch.SetRepr takes ownership of the underlying slice, so make a copy. @@ -1237,22 +1243,13 @@ type pebbleReadOnly struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. - // When the first iterator is initialized, the underlying *pebble.Iterator - // is stashed in iter, so that subsequent iterator initialization can use - // Iterator.Clone to use the same underlying engine state. This relies on - // the fact that all pebbleIterators created here are marked as reusable, - // which causes pebbleIterator.Close to not close iter. iter will be closed - // when pebbleReadOnly.Close is called. - // - // TODO(sumeer): The lazy iterator creation is insufficient to address - // issues like https://github.com/cockroachdb/cockroach/issues/55461. - // We could create the pebble.Iterator eagerly, since a caller using - // pebbleReadOnly is eventually going to create one anyway. But we - // already have different behaviors in different Reader implementations - // (see Reader.ConsistentIterators) that callers don't pay attention - // to, and adding another such difference could be a source of bugs. - // See https://github.com/cockroachdb/cockroach/pull/58515#pullrequestreview-563993424 - // for more discussion. + // When the first iterator is initialized, or when + // PinEngineStateForIterators is called (whichever happens first), the + // underlying *pebble.Iterator is stashed in iter, so that subsequent + // iterator initialization can use Iterator.Clone to use the same underlying + // engine state. This relies on the fact that all pebbleIterators created + // here are marked as reusable, which causes pebbleIterator.Close to not + // close iter. iter will be closed when pebbleReadOnly.Close is called. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator @@ -1474,6 +1471,14 @@ func (p *pebbleReadOnly) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Engine interface. +func (p *pebbleReadOnly) PinEngineStateForIterators() error { + if p.iter == nil { + p.iter = p.parent.db.NewIter(nil) + } + return nil +} + // Writer methods are not implemented for pebbleReadOnly. Ideally, the code // could be refactored so that a Reader could be supplied to evaluateBatch @@ -1673,6 +1678,12 @@ func (p pebbleSnapshot) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Reader interface. +func (p *pebbleSnapshot) PinEngineStateForIterators() error { + // Snapshot already pins state, so nothing to do. + return nil +} + // pebbleGetProto uses Reader.MVCCGet, so it not as efficient as a function // that can unmarshal without copying bytes. But we don't care about // efficiency, since this is used to implement Reader.MVCCGetProto, which is diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 19f8715ba5cc..f8ebaff9a4ab 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -37,12 +37,13 @@ type pebbleBatch struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. - // When the first iterator is initialized, the underlying *pebble.Iterator - // is stashed in iter, so that subsequent iterator initialization can use - // Iterator.Clone to use the same underlying engine state. This relies on - // the fact that all pebbleIterators created here are marked as reusable, - // which causes pebbleIterator.Close to not close iter. iter will be closed - // when pebbleBatch.Close is called. + // When the first iterator is initialized, or when + // PinEngineStateForIterators is called (whichever happens first), the + // underlying *pebble.Iterator is stashed in iter, so that subsequent + // iterator initialization can use Iterator.Clone to use the same underlying + // engine state. This relies on the fact that all pebbleIterators created + // here are marked as reusable, which causes pebbleIterator.Close to not + // close iter. iter will be closed when pebbleBatch.Close is called. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator @@ -299,6 +300,18 @@ func (p *pebbleBatch) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Batch interface. +func (p *pebbleBatch) PinEngineStateForIterators() error { + if p.iter == nil { + if p.batch.Indexed() { + p.iter = p.batch.NewIter(nil) + } else { + p.iter = p.db.NewIter(nil) + } + } + return nil +} + // NewMVCCIterator implements the Batch interface. func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error { var batch pebble.Batch diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 55020acba970..cbeadb049250 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -457,10 +457,14 @@ func TestPebbleIterConsistency(t *testing.T) { roEngine := eng.NewReadOnly() batch := eng.NewBatch() + roEngine2 := eng.NewReadOnly() + batch2 := eng.NewBatch() require.False(t, eng.ConsistentIterators()) require.True(t, roEngine.ConsistentIterators()) require.True(t, batch.ConsistentIterators()) + require.True(t, roEngine2.ConsistentIterators()) + require.True(t, batch2.ConsistentIterators()) // Since an iterator is created on pebbleReadOnly, pebbleBatch before // writing a newer version of "a", the newer version will not be visible to @@ -468,6 +472,9 @@ func TestPebbleIterConsistency(t *testing.T) { roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() + // Pin the state for iterators. + require.Nil(t, roEngine2.PinEngineStateForIterators()) + require.Nil(t, batch2.PinEngineStateForIterators()) // Write a newer version of "a" require.NoError(t, eng.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) @@ -506,26 +513,41 @@ func TestPebbleIterConsistency(t *testing.T) { checkMVCCIter(roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) checkEngineIter(roEngine.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) checkEngineIter(roEngine.NewEngineIterator(IterOptions{Prefix: true})) checkEngineIter(batch.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) checkEngineIter(batch.NewEngineIterator(IterOptions{Prefix: true})) + checkEngineIter(roEngine2.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(roEngine2.NewEngineIterator(IterOptions{Prefix: true})) + checkEngineIter(batch2.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(batch2.NewEngineIterator(IterOptions{Prefix: true})) - // The eng iterator will see both values. - iter := eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}) - defer iter.Close() - iter.SeekGE(MVCCKey{Key: []byte("a")}) - count := 0 - for ; ; iter.Next() { - valid, err := iter.Valid() - require.NoError(t, err) - if !valid { - break + checkIterSeesBothValues := func(iter MVCCIterator) { + iter.SeekGE(MVCCKey{Key: []byte("a")}) + count := 0 + for ; ; iter.Next() { + valid, err := iter.Valid() + require.NoError(t, err) + if !valid { + break + } + count++ } - count++ + require.Equal(t, 2, count) + iter.Close() } - require.Equal(t, 2, count) + // The eng iterator will see both values. + checkIterSeesBothValues(eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + // The indexed batches will see 2 values since the second one is written to the batch. + require.NoError(t, batch.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) + require.NoError(t, batch2.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) + checkIterSeesBothValues(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkIterSeesBothValues(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) } func BenchmarkMVCCKeyCompare(b *testing.B) {