Skip to content

Commit

Permalink
storage: return error from {MVCC,Engine}Iterator.Value
Browse files Browse the repository at this point in the history
Informs cockroachdb/pebble#1170

Epic: CRDB-20378

Release note: None
  • Loading branch information
sumeerbhola committed Dec 21, 2022
1 parent f61c55a commit a269b14
Show file tree
Hide file tree
Showing 24 changed files with 109 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func slurpSSTablesLatestKey(
} else if !ok || !it.UnsafeKey().Less(end) {
break
}
val, err := storage.DecodeMVCCValue(it.Value())
val, err := storage.DecodeMVCCValueAndErr(it.Value())
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ func ScanSST(
} else if !valid {
break
}
v, err := pointIter.Value()
if err != nil {
return err
}
if err = mvccKeyValOp(storage.MVCCKeyValue{
Key: pointIter.Key(),
Value: pointIter.Value(),
Value: v,
}); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,12 @@ func assertExactlyEqualKVs(
// Since the iterator goes from latest to older versions, we compare
// starting from the end of the slice that is sorted by timestamp.
latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1]
v, err := it.Value()
require.NoError(t, err)
require.Equal(t, roachpb.KeyValue{
Key: it.Key().Key,
Value: roachpb.Value{
RawBytes: it.Value(),
RawBytes: v,
Timestamp: it.Key().Timestamp,
},
}, latestVersionInChain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,12 @@ func assertEqualKVs(
// Since the iterator goes from latest to older versions, we compare
// starting from the end of the slice that is sorted by timestamp.
latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1]
v, err := it.Value()
require.NoError(t, err)
require.Equal(t, roachpb.KeyValue{
Key: it.Key().Key,
Value: roachpb.Value{
RawBytes: it.Value(),
RawBytes: v,
Timestamp: it.Key().Timestamp,
},
}, latestVersionInChain)
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvnemesis/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ func (e *Engine) Get(key roachpb.Key, ts hlc.Timestamp) roachpb.Value {
return roachpb.Value{}
}
var valCopy []byte
e.b, valCopy = e.b.Copy(iter.Value(), 0 /* extraCap */)
v, err := iter.ValueAndErr()
if err != nil {
panic(err)
}
e.b, valCopy = e.b.Copy(v, 0 /* extraCap */)
mvccVal, err := storage.DecodeMVCCValue(valCopy)
if err != nil {
panic(err)
Expand Down Expand Up @@ -128,7 +132,11 @@ func (e *Engine) Iterate(
hasPoint, _ := iter.HasPointAndRange()
var keyCopy, valCopy []byte
e.b, keyCopy = e.b.Copy(iter.Key(), 0 /* extraCap */)
e.b, valCopy = e.b.Copy(iter.Value(), 0 /* extraCap */)
v, err := iter.ValueAndErr()
if err != nil {
fn(nil, nil, hlc.Timestamp{}, nil, err)
}
e.b, valCopy = e.b.Copy(v, 0 /* extraCap */)
if hasPoint {
key, err := storage.DecodeMVCCKey(keyCopy)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func runGCOld(
expBaseKey = iterKey.Key
if !iterKey.IsValue() {
keys = []storage.MVCCKey{iter.Key()}
vals = [][]byte{iter.Value()}
v, err := iter.Value()
if err != nil {
return Info{}, err
}
vals = [][]byte{v}
continue
}
// An implicit metadata.
Expand All @@ -203,8 +207,12 @@ func runGCOld(
// determine that there is no intent.
vals = [][]byte{nil}
}
v, err := iter.Value()
if err != nil {
return Info{}, err
}
keys = append(keys, iter.Key())
vals = append(vals, iter.Value())
vals = append(vals, v)
}
// Handle last collected set of keys/vals.
processKeysAndValues()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func getExpectationsGenerator(
p, r := it.HasPointAndRange()
if p {
k := it.Key()
v := it.Value()
v, err := it.Value()
require.NoError(t, err)
if len(baseKey) == 0 {
baseKey = k.Key
// We are only interested in range tombstones covering current point,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (ri *ReplicaMVCCDataIterator) Key() storage.MVCCKey {
}

// Value returns the current value. Only called in tests.
func (ri *ReplicaMVCCDataIterator) Value() []byte {
func (ri *ReplicaMVCCDataIterator) Value() ([]byte, error) {
return ri.it.Value()
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2301,9 +2301,13 @@ func (r *Replica) printRaftTail(
if err != nil {
return sb.String(), err
}
v, err := it.Value()
if err != nil {
return sb.String(), err
}
kv := storage.MVCCKeyValue{
Key: mvccKey,
Value: it.Value(),
Value: v,
}
sb.WriteString(truncateEntryString(SprintMVCCKeyValue(kv, true /* printKey */), 2000))
sb.WriteRune('\n')
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (i *MVCCIterator) Key() storage.MVCCKey {
}

// Value is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) Value() []byte {
func (i *MVCCIterator) Value() ([]byte, error) {
return i.i.Value()
}

Expand Down Expand Up @@ -391,7 +391,7 @@ func (i *EngineIterator) EngineKey() (storage.EngineKey, error) {
}

// Value is part of the storage.EngineIterator interface.
func (i *EngineIterator) Value() []byte {
func (i *EngineIterator) Value() ([]byte, error) {
return i.i.Value()
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/catalog/lease/kv_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ func getRawHistoryKVs(
k := it.Key()
suffix, _, err := codec.DecodeTablePrefix(k.Key)
require.NoError(t, err)
v, err := it.Value()
require.NoError(t, err)
row := roachpb.KeyValue{
Key: suffix,
Value: roachpb.Value{
RawBytes: it.Value(),
RawBytes: v,
},
}
row.Value.ClearChecksum()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/row/fetcher_mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func slurpUserDataKVs(t testing.TB, e storage.Engine) []roachpb.KeyValue {
if !it.UnsafeKey().IsValue() {
return errors.Errorf("found intent key %v", it.UnsafeKey())
}
mvccValue, err := storage.DecodeMVCCValue(it.Value())
mvccValue, err := storage.DecodeMVCCValueAndErr(it.Value())
if err != nil {
t.Fatal(err)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,12 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k1) {
t.Fatalf("expected %s, got %s", k1, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v1) {
t.Fatalf("expected %s, got %s", v1, iter.Value())
checkValErr := func(v []byte, err error) []byte {
require.NoError(t, err)
return v
}
if !reflect.DeepEqual(checkValErr(iter.Value()), v1) {
t.Fatalf("expected %s, got %s", v1, checkValErr(iter.Value()))
}
iter.Next()
if ok, err := iter.Valid(); !ok {
Expand All @@ -762,8 +766,8 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k2) {
t.Fatalf("expected %s, got %s", k2, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v2) {
t.Fatalf("expected %s, got %s", v2, iter.Value())
if !reflect.DeepEqual(checkValErr(iter.Value()), v2) {
t.Fatalf("expected %s, got %s", v2, checkValErr(iter.Value()))
}
iter.Next()
if ok, err := iter.Valid(); err != nil {
Expand All @@ -780,8 +784,8 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k2) {
t.Fatalf("expected %s, got %s", k2, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v2) {
t.Fatalf("expected %s, got %s", v2, iter.Value())
if !reflect.DeepEqual(checkValErr(iter.Value()), v2) {
t.Fatalf("expected %s, got %s", v2, checkValErr(iter.Value()))
}

iter.Prev()
Expand All @@ -791,8 +795,8 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k1) {
t.Fatalf("expected %s, got %s", k1, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v1) {
t.Fatalf("expected %s, got %s", v1, iter.Value())
if !reflect.DeepEqual(checkValErr(iter.Value()), v1) {
t.Fatalf("expected %s, got %s", v1, checkValErr(iter.Value()))
}
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ type MVCCIterator interface {
// this seems avoidable, and we should consider cleaning up the callers.
UnsafeRawMVCCKey() []byte
// Value is like UnsafeValue, but returns memory owned by the caller.
Value() []byte
Value() ([]byte, error)
// ValueProto unmarshals the value the iterator is currently
// pointing to using a protobuf decoder.
ValueProto(msg protoutil.Message) error
Expand Down Expand Up @@ -351,7 +351,7 @@ type EngineIterator interface {
UnsafeValue() ([]byte, error)
// Value returns the current value as a byte slice.
// REQUIRES: latest positioning function returned valid=true.
Value() []byte
Value() ([]byte, error)
// GetRawIter is a low-level method only for use in the storage package,
// that returns the underlying pebble Iterator.
GetRawIter() *pebble.Iterator
Expand Down Expand Up @@ -1248,7 +1248,7 @@ func ScanIntents(
return nil, err
}
intents = append(intents, roachpb.MakeIntent(meta.Txn, lockedKey))
intentBytes += int64(len(lockedKey)) + int64(len(iter.Value()))
intentBytes += int64(len(lockedKey)) + int64(len(v))
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -1488,7 +1488,11 @@ func iterateOnReader(

var kv MVCCKeyValue
if hasPoint, _ := it.HasPointAndRange(); hasPoint {
kv = MVCCKeyValue{Key: it.Key(), Value: it.Value()}
v, err := it.Value()
if err != nil {
return err
}
kv = MVCCKeyValue{Key: it.Key(), Value: v}
}
if !it.RangeBounds().Key.Equal(rangeKeys.Bounds.Key) {
rangeKeys = it.RangeKeys().Clone()
Expand Down Expand Up @@ -1675,7 +1679,11 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error {
if err != nil {
return err
}
if v := iter.Value(); !bytes.Equal(v, u) {
v, err := iter.Value()
if err != nil {
return err
}
if !bytes.Equal(v, u) {
return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func (i *intentInterleavingIter) Key() MVCCKey {
return key
}

func (i *intentInterleavingIter) Value() []byte {
func (i *intentInterleavingIter) Value() ([]byte, error) {
if i.isCurAtIntentIter() {
return i.intentIter.Value()
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/intent_interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,11 @@ func checkAndOutputIter(iter MVCCIterator, b *strings.Builder) {
fmt.Fprintf(b, "output: unable to fetch value: %s\n", err.Error())
return
}
v2 := iter.Value()
v2, err := iter.Value()
if err != nil {
fmt.Fprintf(b, "output: unable to fetch value: %s\n", err.Error())
return
}
if !bytes.Equal(v1, v2) {
fmt.Fprintf(b, "output: value: %x != %x\n", v1, v2)
return
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1823,7 +1823,10 @@ func mvccPutInternal(

// NOTE: we use Value instead of UnsafeValue so that we can move the
// iterator below without invalidating this byte slice.
curProvValRaw = iter.Value()
curProvValRaw, err = iter.Value()
if err != nil {
return false, err
}
curIntentVal, err := DecodeMVCCValue(curProvValRaw)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc_history_metamorphic_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (m *metamorphicMVCCIterator) UnsafeRawMVCCKey() []byte {
return m.it.(storage.MVCCIterator).UnsafeRawMVCCKey()
}

func (m *metamorphicMVCCIterator) Value() []byte {
func (m *metamorphicMVCCIterator) Value() ([]byte, error) {
return m.it.(storage.MVCCIterator).Value()
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,9 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
ek, err := it.EngineKey()
require.NoError(t, err)
require.NoError(t, err)
if err := sst.PutEngineKey(ek, it.Value()); err != nil {
v, err := it.Value()
require.NoError(t, err)
if err := sst.PutEngineKey(ek, v); err != nil {
t.Fatal(err)
}
valid, err = it.NextEngineKey()
Expand Down Expand Up @@ -1483,7 +1485,11 @@ func collectMatchingWithMVCCIterator(
}
ts := iter.Key().Timestamp
if (ts.Less(end) || end == ts) && start.Less(ts) {
expectedKVs = append(expectedKVs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
v, err := iter.Value()
if err != nil {
t.Fatal(err)
}
expectedKVs = append(expectedKVs, MVCCKeyValue{Key: iter.Key(), Value: v})
}
iter.Next()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6773,7 +6773,7 @@ func mvccGetRawWithError(t *testing.T, r Reader, key MVCCKey) ([]byte, error) {
if ok, err := iter.Valid(); err != nil || !ok {
return nil, err
}
return iter.Value(), nil
return iter.Value()
}

func TestMVCCLookupRangeKeyValue(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,12 +667,14 @@ func (p *pebbleIterator) EngineKey() (EngineKey, error) {
}

// Value implements the MVCCIterator and EngineIterator interfaces.
func (p *pebbleIterator) Value() []byte {
// TODO(sumeer): Do not ignore error.
value, _ := p.UnsafeValue()
func (p *pebbleIterator) Value() ([]byte, error) {
value, err := p.UnsafeValue()
if err != nil {
return nil, err
}
valueCopy := make([]byte, len(value))
copy(valueCopy, value)
return valueCopy
return valueCopy, nil
}

// ValueProto implements the MVCCIterator interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,9 @@ func TestPebbleFlushCallbackAndDurabilityRequirement(t *testing.T) {
require.NoError(t, err)
require.Equal(t, v != nil, valid)
if valid {
require.Equal(t, v, iter.Value())
value, err := iter.Value()
require.NoError(t, err)
require.Equal(t, v, value)
}
return v
}
Expand Down
Loading

0 comments on commit a269b14

Please sign in to comment.