diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 903081ecc841..6566d809cba1 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( return summary, err } valueScratch = append(valueScratch[:0], v...) - value := roachpb.Value{RawBytes: valueScratch} + value, err := storage.DecodeValueFromMVCCValue(valueScratch) + if err != nil { + return summary, err + } key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime) @@ -581,8 +583,12 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( if verbose { log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint()) } - // TODO(msbutler): ingest the ImportEpoch from an in progress import - if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil { + + // NB: Using valueScratch here assumes that + // DecodeValueFromMVCCValue, ClearChecksum, and + // InitChecksum don't copy/reallocate the slice they + // were given. + if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil { return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint()) } } diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 27bb99fad239..868a550f60c4 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -234,6 +234,32 @@ func DecodeMVCCValue(buf []byte) (MVCCValue, error) { return decodeExtendedMVCCValue(buf) } +// DecodeValueFromMVCCValue decodes and MVCCValue and returns the +// roachpb.Value portion without parsing the MVCCValueHeader. +// +// NB: Caller assumes that this function does not copy or re-allocate +// the underlying byte slice. +func DecodeValueFromMVCCValue(buf []byte) (roachpb.Value, error) { + if len(buf) == 0 { + // Tombstone with no header. + return roachpb.Value{}, nil + } + if len(buf) <= tagPos { + return roachpb.Value{}, errMVCCValueMissingTag + } + if buf[tagPos] != extendedEncodingSentinel { + return roachpb.Value{RawBytes: buf}, nil + } + + // Extended encoding + headerLen := binary.BigEndian.Uint32(buf) + headerSize := extendedPreludeSize + headerLen + if len(buf) < int(headerSize) { + return roachpb.Value{}, errMVCCValueMissingHeader + } + return roachpb.Value{RawBytes: buf[headerSize:]}, nil +} + // DecodeMVCCValueAndErr is a helper that can be called using the ([]byte, // error) pair returned from the iterator UnsafeValue(), Value() methods. func DecodeMVCCValueAndErr(buf []byte, err error) (MVCCValue, error) { diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index 6a2084390a3b..bfdaca045cf6 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -211,6 +211,24 @@ func TestEncodeDecodeMVCCValue(t *testing.T) { return buf.String() })) + t.Run("DeocdeValueFromMVCCValue/"+name, func(t *testing.T) { + enc, err := EncodeMVCCValue(tc.val) + require.NoError(t, err) + assert.Equal(t, encodedMVCCValueSize(tc.val), len(enc)) + + dec, err := DecodeValueFromMVCCValue(enc) + require.NoError(t, err) + + if len(dec.RawBytes) == 0 { + dec.RawBytes = nil // normalize + } + + require.Equal(t, tc.val.Value, dec) + require.Equal(t, tc.val.IsTombstone(), len(dec.RawBytes) == 0) + isTombstone, err := EncodedMVCCValueIsTombstone(enc) + require.NoError(t, err) + require.Equal(t, tc.val.IsTombstone(), isTombstone) + }) } } @@ -233,6 +251,14 @@ func TestDecodeMVCCValueErrors(t *testing.T) { require.Equal(t, tc.expect, err) require.False(t, isTombstone) }) + t.Run("DecodeValueFromMVCCValue/"+name, func(t *testing.T) { + dec, err := DecodeValueFromMVCCValue(tc.enc) + require.Equal(t, tc.expect, err) + require.Zero(t, dec) + isTombstone, err := EncodedMVCCValueIsTombstone(tc.enc) + require.Equal(t, tc.expect, err) + require.False(t, isTombstone) + }) } } @@ -316,6 +342,27 @@ func BenchmarkDecodeMVCCValue(b *testing.B) { } } +func BenchmarkDecodeValueFromMVCCValue(b *testing.B) { + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + buf, err := EncodeMVCCValue(mvccValue) + require.NoError(b, err) + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + res, err := DecodeValueFromMVCCValue(buf) + if err != nil { // for performance + require.NoError(b, err) + } + _ = res + } + }) + } + } +} + func BenchmarkMVCCValueIsTombstone(b *testing.B) { headers, values := mvccValueBenchmarkConfigs() for hDesc, h := range headers {