Skip to content

Commit

Permalink
backupccl,storage: decode MVCCValue's in RESTORE
Browse files Browse the repository at this point in the history
Previously, we assumed that all bytes in our SST's had no
MVCCValueHeader. Now, we may have MVCCValueHeaders.

Epic: none
Releaste note: None
  • Loading branch information
stevendanna committed Mar 11, 2024
1 parent 6ed2789 commit c6c300c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 4 deletions.
14 changes: 10 additions & 4 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
return summary, err
}
valueScratch = append(valueScratch[:0], v...)
value := roachpb.Value{RawBytes: valueScratch}
value, err := storage.DecodeValueFromMVCCValue(valueScratch)
if err != nil {
return summary, err
}

key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime)

Expand All @@ -581,8 +583,12 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
if verbose {
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}
// TODO(msbutler): ingest the ImportEpoch from an in progress import
if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil {

// NB: Using valueScratch here assumes that
// DecodeValueFromMVCCValue, ClearChecksum, and
// InitChecksum don't copy/reallocate the slice they
// were given.
if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil {
return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint())
}
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/mvcc_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,32 @@ func DecodeMVCCValue(buf []byte) (MVCCValue, error) {
return decodeExtendedMVCCValue(buf)
}

// DecodeValueFromMVCCValue decodes and MVCCValue and returns the
// roachpb.Value portion without parsing the MVCCValueHeader.
//
// NB: Caller assumes that this function does not copy or re-allocate
// the underlying byte slice.
func DecodeValueFromMVCCValue(buf []byte) (roachpb.Value, error) {
if len(buf) == 0 {
// Tombstone with no header.
return roachpb.Value{}, nil
}
if len(buf) <= tagPos {
return roachpb.Value{}, errMVCCValueMissingTag
}
if buf[tagPos] != extendedEncodingSentinel {
return roachpb.Value{RawBytes: buf}, nil
}

// Extended encoding
headerLen := binary.BigEndian.Uint32(buf)
headerSize := extendedPreludeSize + headerLen
if len(buf) < int(headerSize) {
return roachpb.Value{}, errMVCCValueMissingHeader
}
return roachpb.Value{RawBytes: buf[headerSize:]}, nil
}

// DecodeMVCCValueAndErr is a helper that can be called using the ([]byte,
// error) pair returned from the iterator UnsafeValue(), Value() methods.
func DecodeMVCCValueAndErr(buf []byte, err error) (MVCCValue, error) {
Expand Down
47 changes: 47 additions & 0 deletions pkg/storage/mvcc_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,24 @@ func TestEncodeDecodeMVCCValue(t *testing.T) {

return buf.String()
}))
t.Run("DeocdeValueFromMVCCValue/"+name, func(t *testing.T) {
enc, err := EncodeMVCCValue(tc.val)
require.NoError(t, err)
assert.Equal(t, encodedMVCCValueSize(tc.val), len(enc))

dec, err := DecodeValueFromMVCCValue(enc)
require.NoError(t, err)

if len(dec.RawBytes) == 0 {
dec.RawBytes = nil // normalize
}

require.Equal(t, tc.val.Value, dec)
require.Equal(t, tc.val.IsTombstone(), len(dec.RawBytes) == 0)
isTombstone, err := EncodedMVCCValueIsTombstone(enc)
require.NoError(t, err)
require.Equal(t, tc.val.IsTombstone(), isTombstone)
})
}
}

Expand All @@ -233,6 +251,14 @@ func TestDecodeMVCCValueErrors(t *testing.T) {
require.Equal(t, tc.expect, err)
require.False(t, isTombstone)
})
t.Run("DecodeValueFromMVCCValue/"+name, func(t *testing.T) {
dec, err := DecodeValueFromMVCCValue(tc.enc)
require.Equal(t, tc.expect, err)
require.Zero(t, dec)
isTombstone, err := EncodedMVCCValueIsTombstone(tc.enc)
require.Equal(t, tc.expect, err)
require.False(t, isTombstone)
})
}
}

Expand Down Expand Up @@ -316,6 +342,27 @@ func BenchmarkDecodeMVCCValue(b *testing.B) {
}
}

func BenchmarkDecodeValueFromMVCCValue(b *testing.B) {
headers, values := mvccValueBenchmarkConfigs()
for hDesc, h := range headers {
for vDesc, v := range values {
name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc)
mvccValue := MVCCValue{MVCCValueHeader: h, Value: v}
buf, err := EncodeMVCCValue(mvccValue)
require.NoError(b, err)
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, err := DecodeValueFromMVCCValue(buf)
if err != nil { // for performance
require.NoError(b, err)
}
_ = res
}
})
}
}
}

func BenchmarkMVCCValueIsTombstone(b *testing.B) {
headers, values := mvccValueBenchmarkConfigs()
for hDesc, h := range headers {
Expand Down

0 comments on commit c6c300c

Please sign in to comment.