Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: return error from {MVCC,Engine}Iterator.Value #94025

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func slurpSSTablesLatestKey(
} else if !ok || !it.UnsafeKey().Less(end) {
break
}
val, err := storage.DecodeMVCCValue(it.Value())
val, err := storage.DecodeMVCCValueAndErr(it.Value())
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ func ScanSST(
} else if !valid {
break
}
v, err := pointIter.Value()
if err != nil {
return err
}
if err = mvccKeyValOp(storage.MVCCKeyValue{
Key: pointIter.Key(),
Value: pointIter.Value(),
Value: v,
}); err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,12 @@ func assertExactlyEqualKVs(
// Since the iterator goes from latest to older versions, we compare
// starting from the end of the slice that is sorted by timestamp.
latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1]
v, err := it.Value()
require.NoError(t, err)
require.Equal(t, roachpb.KeyValue{
Key: it.Key().Key,
Value: roachpb.Value{
RawBytes: it.Value(),
RawBytes: v,
Timestamp: it.Key().Timestamp,
},
}, latestVersionInChain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,12 @@ func assertEqualKVs(
// Since the iterator goes from latest to older versions, we compare
// starting from the end of the slice that is sorted by timestamp.
latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1]
v, err := it.Value()
require.NoError(t, err)
require.Equal(t, roachpb.KeyValue{
Key: it.Key().Key,
Value: roachpb.Value{
RawBytes: it.Value(),
RawBytes: v,
Timestamp: it.Key().Timestamp,
},
}, latestVersionInChain)
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvnemesis/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ func (e *Engine) Get(key roachpb.Key, ts hlc.Timestamp) roachpb.Value {
return roachpb.Value{}
}
var valCopy []byte
e.b, valCopy = e.b.Copy(iter.Value(), 0 /* extraCap */)
v, err := iter.ValueAndErr()
if err != nil {
panic(err)
}
e.b, valCopy = e.b.Copy(v, 0 /* extraCap */)
mvccVal, err := storage.DecodeMVCCValue(valCopy)
if err != nil {
panic(err)
Expand Down Expand Up @@ -128,7 +132,11 @@ func (e *Engine) Iterate(
hasPoint, _ := iter.HasPointAndRange()
var keyCopy, valCopy []byte
e.b, keyCopy = e.b.Copy(iter.Key(), 0 /* extraCap */)
e.b, valCopy = e.b.Copy(iter.Value(), 0 /* extraCap */)
v, err := iter.ValueAndErr()
if err != nil {
fn(nil, nil, hlc.Timestamp{}, nil, err)
}
e.b, valCopy = e.b.Copy(v, 0 /* extraCap */)
if hasPoint {
key, err := storage.DecodeMVCCKey(keyCopy)
if err != nil {
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvserver/gc/gc_old_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,11 @@ func runGCOld(
expBaseKey = iterKey.Key
if !iterKey.IsValue() {
keys = []storage.MVCCKey{iter.Key()}
vals = [][]byte{iter.Value()}
v, err := iter.Value()
if err != nil {
return Info{}, err
}
vals = [][]byte{v}
continue
}
// An implicit metadata.
Expand All @@ -203,8 +207,12 @@ func runGCOld(
// determine that there is no intent.
vals = [][]byte{nil}
}
v, err := iter.Value()
if err != nil {
return Info{}, err
}
keys = append(keys, iter.Key())
vals = append(vals, iter.Value())
vals = append(vals, v)
}
// Handle last collected set of keys/vals.
processKeysAndValues()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ func getExpectationsGenerator(
p, r := it.HasPointAndRange()
if p {
k := it.Key()
v := it.Value()
v, err := it.Value()
require.NoError(t, err)
if len(baseKey) == 0 {
baseKey = k.Key
// We are only interested in range tombstones covering current point,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (ri *ReplicaMVCCDataIterator) Key() storage.MVCCKey {
}

// Value returns the current value. Only called in tests.
func (ri *ReplicaMVCCDataIterator) Value() []byte {
func (ri *ReplicaMVCCDataIterator) Value() ([]byte, error) {
return ri.it.Value()
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2301,9 +2301,13 @@ func (r *Replica) printRaftTail(
if err != nil {
return sb.String(), err
}
v, err := it.Value()
if err != nil {
return sb.String(), err
}
kv := storage.MVCCKeyValue{
Key: mvccKey,
Value: it.Value(),
Value: v,
}
sb.WriteString(truncateEntryString(SprintMVCCKeyValue(kv, true /* printKey */), 2000))
sb.WriteRune('\n')
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (i *MVCCIterator) Key() storage.MVCCKey {
}

// Value is part of the storage.MVCCIterator interface.
func (i *MVCCIterator) Value() []byte {
func (i *MVCCIterator) Value() ([]byte, error) {
return i.i.Value()
}

Expand Down Expand Up @@ -391,7 +391,7 @@ func (i *EngineIterator) EngineKey() (storage.EngineKey, error) {
}

// Value is part of the storage.EngineIterator interface.
func (i *EngineIterator) Value() []byte {
func (i *EngineIterator) Value() ([]byte, error) {
return i.i.Value()
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/catalog/lease/kv_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ func getRawHistoryKVs(
k := it.Key()
suffix, _, err := codec.DecodeTablePrefix(k.Key)
require.NoError(t, err)
v, err := it.Value()
require.NoError(t, err)
row := roachpb.KeyValue{
Key: suffix,
Value: roachpb.Value{
RawBytes: it.Value(),
RawBytes: v,
},
}
row.Value.ClearChecksum()
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/row/fetcher_mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func slurpUserDataKVs(t testing.TB, e storage.Engine) []roachpb.KeyValue {
if !it.UnsafeKey().IsValue() {
return errors.Errorf("found intent key %v", it.UnsafeKey())
}
mvccValue, err := storage.DecodeMVCCValue(it.Value())
mvccValue, err := storage.DecodeMVCCValueAndErr(it.Value())
if err != nil {
t.Fatal(err)
}
Expand Down
20 changes: 12 additions & 8 deletions pkg/storage/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,12 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k1) {
t.Fatalf("expected %s, got %s", k1, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v1) {
t.Fatalf("expected %s, got %s", v1, iter.Value())
checkValErr := func(v []byte, err error) []byte {
require.NoError(t, err)
return v
}
if !reflect.DeepEqual(checkValErr(iter.Value()), v1) {
t.Fatalf("expected %s, got %s", v1, checkValErr(iter.Value()))
}
iter.Next()
if ok, err := iter.Valid(); !ok {
Expand All @@ -762,8 +766,8 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k2) {
t.Fatalf("expected %s, got %s", k2, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v2) {
t.Fatalf("expected %s, got %s", v2, iter.Value())
if !reflect.DeepEqual(checkValErr(iter.Value()), v2) {
t.Fatalf("expected %s, got %s", v2, checkValErr(iter.Value()))
}
iter.Next()
if ok, err := iter.Valid(); err != nil {
Expand All @@ -780,8 +784,8 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k2) {
t.Fatalf("expected %s, got %s", k2, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v2) {
t.Fatalf("expected %s, got %s", v2, iter.Value())
if !reflect.DeepEqual(checkValErr(iter.Value()), v2) {
t.Fatalf("expected %s, got %s", v2, checkValErr(iter.Value()))
}

iter.Prev()
Expand All @@ -791,8 +795,8 @@ func TestBatchIteration(t *testing.T) {
if !reflect.DeepEqual(iter.Key(), k1) {
t.Fatalf("expected %s, got %s", k1, iter.Key())
}
if !reflect.DeepEqual(iter.Value(), v1) {
t.Fatalf("expected %s, got %s", v1, iter.Value())
if !reflect.DeepEqual(checkValErr(iter.Value()), v1) {
t.Fatalf("expected %s, got %s", v1, checkValErr(iter.Value()))
}
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ type MVCCIterator interface {
// this seems avoidable, and we should consider cleaning up the callers.
UnsafeRawMVCCKey() []byte
// Value is like UnsafeValue, but returns memory owned by the caller.
Value() []byte
Value() ([]byte, error)
// ValueProto unmarshals the value the iterator is currently
// pointing to using a protobuf decoder.
ValueProto(msg protoutil.Message) error
Expand Down Expand Up @@ -351,7 +351,7 @@ type EngineIterator interface {
UnsafeValue() ([]byte, error)
// Value returns the current value as a byte slice.
// REQUIRES: latest positioning function returned valid=true.
Value() []byte
Value() ([]byte, error)
// GetRawIter is a low-level method only for use in the storage package,
// that returns the underlying pebble Iterator.
GetRawIter() *pebble.Iterator
Expand Down Expand Up @@ -1248,7 +1248,7 @@ func ScanIntents(
return nil, err
}
intents = append(intents, roachpb.MakeIntent(meta.Txn, lockedKey))
intentBytes += int64(len(lockedKey)) + int64(len(iter.Value()))
intentBytes += int64(len(lockedKey)) + int64(len(v))
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -1488,7 +1488,11 @@ func iterateOnReader(

var kv MVCCKeyValue
if hasPoint, _ := it.HasPointAndRange(); hasPoint {
kv = MVCCKeyValue{Key: it.Key(), Value: it.Value()}
v, err := it.Value()
if err != nil {
return err
}
kv = MVCCKeyValue{Key: it.Key(), Value: v}
}
if !it.RangeBounds().Key.Equal(rangeKeys.Bounds.Key) {
rangeKeys = it.RangeKeys().Clone()
Expand Down Expand Up @@ -1675,7 +1679,11 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error {
if err != nil {
return err
}
if v := iter.Value(); !bytes.Equal(v, u) {
v, err := iter.Value()
if err != nil {
return err
}
if !bytes.Equal(v, u) {
return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/intent_interleaving_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ func (i *intentInterleavingIter) Key() MVCCKey {
return key
}

func (i *intentInterleavingIter) Value() []byte {
func (i *intentInterleavingIter) Value() ([]byte, error) {
if i.isCurAtIntentIter() {
return i.intentIter.Value()
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/storage/intent_interleaving_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,11 @@ func checkAndOutputIter(iter MVCCIterator, b *strings.Builder) {
fmt.Fprintf(b, "output: unable to fetch value: %s\n", err.Error())
return
}
v2 := iter.Value()
v2, err := iter.Value()
if err != nil {
fmt.Fprintf(b, "output: unable to fetch value: %s\n", err.Error())
return
}
if !bytes.Equal(v1, v2) {
fmt.Fprintf(b, "output: value: %x != %x\n", v1, v2)
return
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1823,7 +1823,10 @@ func mvccPutInternal(

// NOTE: we use Value instead of UnsafeValue so that we can move the
// iterator below without invalidating this byte slice.
curProvValRaw = iter.Value()
curProvValRaw, err = iter.Value()
if err != nil {
return false, err
}
curIntentVal, err := DecodeMVCCValue(curProvValRaw)
if err != nil {
return false, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc_history_metamorphic_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (m *metamorphicMVCCIterator) UnsafeRawMVCCKey() []byte {
return m.it.(storage.MVCCIterator).UnsafeRawMVCCKey()
}

func (m *metamorphicMVCCIterator) Value() []byte {
func (m *metamorphicMVCCIterator) Value() ([]byte, error) {
return m.it.(storage.MVCCIterator).Value()
}

Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,9 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
ek, err := it.EngineKey()
require.NoError(t, err)
require.NoError(t, err)
if err := sst.PutEngineKey(ek, it.Value()); err != nil {
v, err := it.Value()
require.NoError(t, err)
if err := sst.PutEngineKey(ek, v); err != nil {
t.Fatal(err)
}
valid, err = it.NextEngineKey()
Expand Down Expand Up @@ -1483,7 +1485,11 @@ func collectMatchingWithMVCCIterator(
}
ts := iter.Key().Timestamp
if (ts.Less(end) || end == ts) && start.Less(ts) {
expectedKVs = append(expectedKVs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
v, err := iter.Value()
if err != nil {
t.Fatal(err)
}
expectedKVs = append(expectedKVs, MVCCKeyValue{Key: iter.Key(), Value: v})
}
iter.Next()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6773,7 +6773,7 @@ func mvccGetRawWithError(t *testing.T, r Reader, key MVCCKey) ([]byte, error) {
if ok, err := iter.Valid(); err != nil || !ok {
return nil, err
}
return iter.Value(), nil
return iter.Value()
}

func TestMVCCLookupRangeKeyValue(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,12 +667,14 @@ func (p *pebbleIterator) EngineKey() (EngineKey, error) {
}

// Value implements the MVCCIterator and EngineIterator interfaces.
func (p *pebbleIterator) Value() []byte {
// TODO(sumeer): Do not ignore error.
value, _ := p.UnsafeValue()
func (p *pebbleIterator) Value() ([]byte, error) {
value, err := p.UnsafeValue()
if err != nil {
return nil, err
}
valueCopy := make([]byte, len(value))
copy(valueCopy, value)
return valueCopy
return valueCopy, nil
}

// ValueProto implements the MVCCIterator interface.
Expand Down
4 changes: 3 additions & 1 deletion pkg/storage/pebble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,9 @@ func TestPebbleFlushCallbackAndDurabilityRequirement(t *testing.T) {
require.NoError(t, err)
require.Equal(t, v != nil, valid)
if valid {
require.Equal(t, v, iter.Value())
value, err := iter.Value()
require.NoError(t, err)
require.Equal(t, v, value)
}
return v
}
Expand Down
Loading