Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
120354: storage: reduce allocations when encoding MVCCValue's  r=dt a=stevendanna

This adds a new function that lets us pass a buffer to
use when encoding MVCC values. This is very useful in loops where we
might be allocating many MVCC values in a loop, such as in
ExportRequest or in SSTBatcher.

Before:

```
BenchmarkMVCCExportToSST/importEpochs=false/numKeys=65536/numRevisions=100/exportAllRevisions=true-10                  1        1464855709 ns/op         45293904 B/op      39419 allocs/op
BenchmarkMVCCExportToSST/importEpochs=true/numKeys=65536/numRevisions=100/exportAllRevisions=true-10                   1        1919207875 ns/op        207366256 B/op   6595176 allocs/op
```

After:

```
BenchmarkMVCCExportToSST/importEpochs=false/numKeys=65536/numRevisions=100/exportAllRevisions=true-10                  1        1458935583 ns/op        45327256 B/op      39337 allocs/op
BenchmarkMVCCExportToSST/importEpochs=true/numKeys=65536/numRevisions=100/exportAllRevisions=true-10                   1        1756803250 ns/op        49162648 B/op      41369 allocs/op
```

Epic: none
Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Mar 13, 2024
2 parents aa755fd + e817d29 commit 8da64fe
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 27 deletions.
8 changes: 6 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ type SSTBatcher struct {

asyncAddSSTs ctxgroup.Group

valueScratch []byte

mu struct {
syncutil.Mutex

Expand Down Expand Up @@ -332,16 +334,17 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) {
func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32,
) error {

mvccVal, err := storage.DecodeMVCCValue(value)
if err != nil {
return err
}
mvccVal.MVCCValueHeader.ImportEpoch = importEpoch
encVal, err := storage.EncodeMVCCValue(mvccVal)
b.valueScratch, err = storage.EncodeMVCCValueToBuf(mvccVal, b.valueScratch[:0])
if err != nil {
return err
}
return b.AddMVCCKey(ctx, key, encVal)
return b.AddMVCCKey(ctx, key, b.valueScratch)
}

// AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed).
Expand Down Expand Up @@ -427,6 +430,7 @@ func (b *SSTBatcher) Reset(ctx context.Context) {
b.batchEndTimestamp = hlc.Timestamp{}
b.flushKey = nil
b.flushKeyChecked = false
b.valueScratch = b.valueScratch[:0]
b.ms.Reset()

if b.writeAtBatchTS {
Expand Down
43 changes: 33 additions & 10 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,25 @@ func BenchmarkMVCCExportToSST(b *testing.B) {
runMVCCExportToSST(b, opts)
})
}
withImportEpochs := []bool{false, true}
for _, ie := range withImportEpochs {
numKey := numKeys[len(numKeys)-1]
numRevision := numRevisions[len(numRevisions)-1]
numRangeKey := numRangeKeys[len(numRangeKeys)-1]
exportAllRevisionVal := exportAllRevisions[len(exportAllRevisions)-1]
b.Run(fmt.Sprintf("importEpochs=%t/numKeys=%d/numRevisions=%d/exportAllRevisions=%t",
ie, numKey, numRevision, exportAllRevisionVal,
), func(b *testing.B) {
opts := mvccExportToSSTOpts{
numKeys: numKey,
numRevisions: numRevision,
numRangeKeys: numRangeKey,
exportAllRevisions: exportAllRevisionVal,
importEpochs: ie,
}
runMVCCExportToSST(b, opts)
})
}
}

const numIntentKeys = 1000
Expand Down Expand Up @@ -1678,8 +1697,8 @@ func runMVCCAcquireLockCommon(
}

type mvccExportToSSTOpts struct {
numKeys, numRevisions, numRangeKeys int
exportAllRevisions, useElasticCPUHandle bool
numKeys, numRevisions, numRangeKeys int
importEpochs, exportAllRevisions, useElasticCPUHandle bool

// percentage specifies the share of the dataset to export. 100 will be a full
// export, disabling the TBI optimization. <100 will be an incremental export
Expand Down Expand Up @@ -1741,6 +1760,9 @@ func runMVCCExportToSST(b *testing.B, opts mvccExportToSSTOpts) {
for j := 0; j < opts.numRevisions; j++ {
mvccKey := MVCCKey{Key: key, Timestamp: hlc.Timestamp{WallTime: mkWall(j), Logical: 0}}
mvccValue := MVCCValue{Value: roachpb.MakeValueFromString("foobar")}
if opts.importEpochs {
mvccValue.ImportEpoch = 1
}
err := batch.PutMVCC(mvccKey, mvccValue)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -1792,14 +1814,15 @@ func runMVCCExportToSST(b *testing.B, opts mvccExportToSSTOpts) {
startTS := hlc.Timestamp{WallTime: startWall}
endTS := hlc.Timestamp{WallTime: endWall}
_, _, err := MVCCExportToSST(ctx, st, engine, MVCCExportOptions{
StartKey: MVCCKey{Key: keys.LocalMax},
EndKey: roachpb.KeyMax,
StartTS: startTS,
EndTS: endTS,
ExportAllRevisions: opts.exportAllRevisions,
TargetSize: 0,
MaxSize: 0,
StopMidKey: false,
StartKey: MVCCKey{Key: keys.LocalMax},
EndKey: roachpb.KeyMax,
StartTS: startTS,
EndTS: endTS,
ExportAllRevisions: opts.exportAllRevisions,
TargetSize: 0,
MaxSize: 0,
StopMidKey: false,
IncludeMVCCValueHeader: opts.importEpochs,
}, &buf)
if err != nil {
b.Fatal(err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -7751,6 +7751,7 @@ func mvccExportToWriter(
return maxSize
}

var valueScratch []byte
iter.SeekGE(opts.StartKey)
for {
if ok, err := iter.Valid(); err != nil {
Expand Down Expand Up @@ -7884,11 +7885,12 @@ func mvccExportToWriter(
return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey)
}

if opts.IncludeMVCCValueHeader {
unsafeValue, err = EncodeMVCCValueForExport(mvccValue)
if !ok && opts.IncludeMVCCValueHeader {
valueScratch, err = EncodeMVCCValueForExport(mvccValue, valueScratch[:0])
if err != nil {
return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey)
}
unsafeValue = valueScratch
} else {
unsafeValue = mvccValue.Value.RawBytes
}
Expand Down
35 changes: 25 additions & 10 deletions pkg/storage/mvcc_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,14 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) {

// EncodeMVCCValueForExport encodes fields from the MVCCValueHeader
// that are appropriate for export out of the cluster.
func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) {
if mvccValue.ImportEpoch == 0 {
//
// NB: Must be updated with ExtendedEncodingSizeForExport.
func EncodeMVCCValueForExport(mvccValue MVCCValue, b []byte) ([]byte, error) {
mvccValue.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{}
if mvccValue.MVCCValueHeader.IsEmpty() {
return mvccValue.Value.RawBytes, nil
}

// We only export ImportEpoch.
mvccValue.MVCCValueHeader = enginepb.MVCCValueHeader{
ImportEpoch: mvccValue.ImportEpoch,
}
return EncodeMVCCValue(mvccValue)
return EncodeMVCCValueToBuf(mvccValue, b)
}

// When running a metamorphic build, disable the simple MVCC value encoding to
Expand Down Expand Up @@ -179,13 +177,24 @@ func encodedMVCCValueSize(v MVCCValue) int {

// EncodeMVCCValue encodes an MVCCValue into its Pebble representation. See the
// comment on MVCCValue for a description of the encoding scheme.
func EncodeMVCCValue(v MVCCValue) ([]byte, error) {
return EncodeMVCCValueToBuf(v, nil)
}

// EncodeMVCCValueToBuf encodes an MVCCValue into its Pebble
// representation. See the comment on MVCCValue for a description of
// the encoding scheme.
//
// If extended encoding is required, the given buffer will be used if
// it is large enough. If the provided buffer is not large enough a
// new buffer is allocated.
//
// TODO(erikgrinaker): This could mid-stack inline when we compared
// v.MVCCValueHeader == enginepb.MVCCValueHeader{} instead of IsEmpty(), but
// struct comparisons have a significant performance regression in Go 1.19 which
// negates the inlining gain. Reconsider this with Go 1.20. See:
// https://github.com/cockroachdb/cockroach/issues/88818
func EncodeMVCCValue(v MVCCValue) ([]byte, error) {
func EncodeMVCCValueToBuf(v MVCCValue, buf []byte) ([]byte, error) {
if v.MVCCValueHeader.IsEmpty() && !disableSimpleValueEncoding {
// Simple encoding. Use the roachpb.Value encoding directly with no
// modification. No need to re-allocate or copy.
Expand All @@ -198,7 +207,13 @@ func EncodeMVCCValue(v MVCCValue) ([]byte, error) {
headerSize := extendedPreludeSize + headerLen
valueSize := headerSize + len(v.Value.RawBytes)

buf := make([]byte, valueSize)
if valueSize > cap(buf) {
buf = make([]byte, valueSize)
} else {
buf = buf[:valueSize]
}
// Extended encoding. Wrap the roachpb.Value encoding with a header containing
// MVCC-level metadata. Requires a copy.
// 4-byte-header-len
binary.BigEndian.PutUint32(buf, uint32(headerLen))
// 1-byte-sentinel
Expand Down
30 changes: 27 additions & 3 deletions pkg/storage/mvcc_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestEncodeMVCCValueForExport(t *testing.T) {
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
encodedVal, err := EncodeMVCCValueForExport(tc.val)
encodedVal, err := EncodeMVCCValueForExport(tc.val, nil)
require.NoError(t, err)
strippedMVCCVal, err := DecodeMVCCValue(encodedVal)
require.NoError(t, err)
Expand Down Expand Up @@ -316,13 +316,37 @@ func BenchmarkEncodeMVCCValueForExport(b *testing.B) {
for vDesc, v := range values {
name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc)
mvccValue := MVCCValue{MVCCValueHeader: h, Value: v}
var buf []byte
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, err := EncodeMVCCValueForExport(mvccValue)
var err error
buf, err = EncodeMVCCValueForExport(mvccValue, buf[:0])
if err != nil { // for performance
require.NoError(b, err)
}
_ = res
_ = buf
}
})
}
}
}

func BenchmarkEncodeMVCCValueWithAllocator(b *testing.B) {
DisableMetamorphicSimpleValueEncoding(b)
headers, values := mvccValueBenchmarkConfigs()
for hDesc, h := range headers {
for vDesc, v := range values {
name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc)
mvccValue := MVCCValue{MVCCValueHeader: h, Value: v}
var buf []byte
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
var err error
buf, err = EncodeMVCCValueToBuf(mvccValue, buf[:0])
if err != nil { // for performance
require.NoError(b, err)
}
_ = buf
}
})
}
Expand Down

0 comments on commit 8da64fe

Please sign in to comment.