From abee742f88cc5a971fcbe07bb2a0a0362018790c Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 12 Mar 2024 17:00:49 +0000 Subject: [PATCH 1/2] storage: reduce allocations when encoding MVCCValue's This adds a new function that lets us pass a pre-allocated buffer to use when encoding MVCC values. This is very useful in loops where we might be allocating many MVCC values in a loop, such as in ExportRequest or in SSTBatcher. Before: ``` BenchmarkMVCCExportToSST/importEpochs=false/numKeys=65536/numRevisions=100/exportAllRevisions=true-10 1 1464855709 ns/op 45293904 B/op 39419 allocs/op BenchmarkMVCCExportToSST/importEpochs=true/numKeys=65536/numRevisions=100/exportAllRevisions=true-10 1 1919207875 ns/op 207366256 B/op 6595176 allocs/op ``` After: ``` BenchmarkMVCCExportToSST/importEpochs=false/numKeys=65536/numRevisions=100/exportAllRevisions=true-10 1 1458935583 ns/op 45327256 B/op 39337 allocs/op BenchmarkMVCCExportToSST/importEpochs=true/numKeys=65536/numRevisions=100/exportAllRevisions=true-10 1 1756803250 ns/op 49162648 B/op 41369 allocs/op ``` Epic: none Release note: None --- pkg/kv/bulk/sst_batcher.go | 8 +++-- pkg/storage/bench_test.go | 43 +++++++++++++++++------ pkg/storage/mvcc.go | 6 ++-- pkg/storage/mvcc_value.go | 62 +++++++++++++++++++++++++++++----- pkg/storage/mvcc_value_test.go | 30 ++++++++++++++-- 5 files changed, 124 insertions(+), 25 deletions(-) diff --git a/pkg/kv/bulk/sst_batcher.go b/pkg/kv/bulk/sst_batcher.go index e19ec6b05a03..c9fe2cbef6a0 100644 --- a/pkg/kv/bulk/sst_batcher.go +++ b/pkg/kv/bulk/sst_batcher.go @@ -193,6 +193,8 @@ type SSTBatcher struct { asyncAddSSTs ctxgroup.Group + valueScratch []byte + mu struct { syncutil.Mutex @@ -332,16 +334,17 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) { func (b *SSTBatcher) AddMVCCKeyWithImportEpoch( ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32, ) error { + mvccVal, err := storage.DecodeMVCCValue(value) if err != nil { return err } mvccVal.MVCCValueHeader.ImportEpoch = importEpoch - encVal, err := storage.EncodeMVCCValue(mvccVal) + b.valueScratch, err = storage.EncodeMVCCValueToBuf(mvccVal, b.valueScratch[:0]) if err != nil { return err } - return b.AddMVCCKey(ctx, key, encVal) + return b.AddMVCCKey(ctx, key, b.valueScratch) } // AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed). @@ -427,6 +430,7 @@ func (b *SSTBatcher) Reset(ctx context.Context) { b.batchEndTimestamp = hlc.Timestamp{} b.flushKey = nil b.flushKeyChecked = false + b.valueScratch = b.valueScratch[:0] b.ms.Reset() if b.writeAtBatchTS { diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index b272d8bdea6b..107a1f80e1a6 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -218,6 +218,25 @@ func BenchmarkMVCCExportToSST(b *testing.B) { runMVCCExportToSST(b, opts) }) } + withImportEpochs := []bool{false, true} + for _, ie := range withImportEpochs { + numKey := numKeys[len(numKeys)-1] + numRevision := numRevisions[len(numRevisions)-1] + numRangeKey := numRangeKeys[len(numRangeKeys)-1] + exportAllRevisionVal := exportAllRevisions[len(exportAllRevisions)-1] + b.Run(fmt.Sprintf("importEpochs=%t/numKeys=%d/numRevisions=%d/exportAllRevisions=%t", + ie, numKey, numRevision, exportAllRevisionVal, + ), func(b *testing.B) { + opts := mvccExportToSSTOpts{ + numKeys: numKey, + numRevisions: numRevision, + numRangeKeys: numRangeKey, + exportAllRevisions: exportAllRevisionVal, + importEpochs: ie, + } + runMVCCExportToSST(b, opts) + }) + } } const numIntentKeys = 1000 @@ -1678,8 +1697,8 @@ func runMVCCAcquireLockCommon( } type mvccExportToSSTOpts struct { - numKeys, numRevisions, numRangeKeys int - exportAllRevisions, useElasticCPUHandle bool + numKeys, numRevisions, numRangeKeys int + importEpochs, exportAllRevisions, useElasticCPUHandle bool // percentage specifies the share of the dataset to export. 100 will be a full // export, disabling the TBI optimization. <100 will be an incremental export @@ -1741,6 +1760,9 @@ func runMVCCExportToSST(b *testing.B, opts mvccExportToSSTOpts) { for j := 0; j < opts.numRevisions; j++ { mvccKey := MVCCKey{Key: key, Timestamp: hlc.Timestamp{WallTime: mkWall(j), Logical: 0}} mvccValue := MVCCValue{Value: roachpb.MakeValueFromString("foobar")} + if opts.importEpochs { + mvccValue.ImportEpoch = 1 + } err := batch.PutMVCC(mvccKey, mvccValue) if err != nil { b.Fatal(err) @@ -1792,14 +1814,15 @@ func runMVCCExportToSST(b *testing.B, opts mvccExportToSSTOpts) { startTS := hlc.Timestamp{WallTime: startWall} endTS := hlc.Timestamp{WallTime: endWall} _, _, err := MVCCExportToSST(ctx, st, engine, MVCCExportOptions{ - StartKey: MVCCKey{Key: keys.LocalMax}, - EndKey: roachpb.KeyMax, - StartTS: startTS, - EndTS: endTS, - ExportAllRevisions: opts.exportAllRevisions, - TargetSize: 0, - MaxSize: 0, - StopMidKey: false, + StartKey: MVCCKey{Key: keys.LocalMax}, + EndKey: roachpb.KeyMax, + StartTS: startTS, + EndTS: endTS, + ExportAllRevisions: opts.exportAllRevisions, + TargetSize: 0, + MaxSize: 0, + StopMidKey: false, + IncludeMVCCValueHeader: opts.importEpochs, }, &buf) if err != nil { b.Fatal(err) diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index ab63da1d7116..79386bbc5a0b 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -7751,6 +7751,7 @@ func mvccExportToWriter( return maxSize } + var valueScratch []byte iter.SeekGE(opts.StartKey) for { if ok, err := iter.Valid(); err != nil { @@ -7884,11 +7885,12 @@ func mvccExportToWriter( return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) } - if opts.IncludeMVCCValueHeader { - unsafeValue, err = EncodeMVCCValueForExport(mvccValue) + if !ok && opts.IncludeMVCCValueHeader { + valueScratch, err = EncodeMVCCValueForExport(mvccValue, valueScratch[:0]) if err != nil { return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "repackaging imported mvcc value %s", unsafeKey) } + unsafeValue = valueScratch } else { unsafeValue = mvccValue.Value.RawBytes } diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 2b8ea7f426f4..06cec556fc3f 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -136,16 +136,14 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) { // EncodeMVCCValueForExport encodes fields from the MVCCValueHeader // that are appropriate for export out of the cluster. -func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) { - if mvccValue.ImportEpoch == 0 { +// +// NB: Must be updated with ExtendedEncodingSizeForExport. +func EncodeMVCCValueForExport(mvccValue MVCCValue, b []byte) ([]byte, error) { + mvccValue.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{} + if mvccValue.MVCCValueHeader.IsEmpty() { return mvccValue.Value.RawBytes, nil } - - // We only export ImportEpoch. - mvccValue.MVCCValueHeader = enginepb.MVCCValueHeader{ - ImportEpoch: mvccValue.ImportEpoch, - } - return EncodeMVCCValue(mvccValue) + return EncodeMVCCValueToBuf(mvccValue, b) } // When running a metamorphic build, disable the simple MVCC value encoding to @@ -185,6 +183,8 @@ func encodedMVCCValueSize(v MVCCValue) int { // struct comparisons have a significant performance regression in Go 1.19 which // negates the inlining gain. Reconsider this with Go 1.20. See: // https://github.com/cockroachdb/cockroach/issues/88818 +// +// TODO(ssd): Dedup with EncodeMVCCValueWithAllocator. func EncodeMVCCValue(v MVCCValue) ([]byte, error) { if v.MVCCValueHeader.IsEmpty() && !disableSimpleValueEncoding { // Simple encoding. Use the roachpb.Value encoding directly with no @@ -216,6 +216,52 @@ func EncodeMVCCValue(v MVCCValue) ([]byte, error) { return buf, nil } +// EncodeMVCCValueToBuf encodes an MVCCValue into its Pebble +// representation. See the comment on MVCCValue for a description of +// the encoding scheme. +// +// If extended encoding is required, the given buffer will be used if +// it is large enough. If the provided buffer is not large enough a +// new buffer is allocated. +// +// TODO(ssd): Dedup with EncodeMVCCValue. +func EncodeMVCCValueToBuf(v MVCCValue, buf []byte) ([]byte, error) { + if v.MVCCValueHeader.IsEmpty() && !disableSimpleValueEncoding { + // Simple encoding. Use the roachpb.Value encoding directly with no + // modification. No need to re-allocate or copy. + return v.Value.RawBytes, nil + } + + // Extended encoding. Wrap the roachpb.Value encoding with a header containing + // MVCC-level metadata. Requires a re-allocation and copy. + headerLen := v.MVCCValueHeader.Size() + headerSize := extendedPreludeSize + headerLen + valueSize := headerSize + len(v.Value.RawBytes) + + if valueSize > cap(buf) { + buf = make([]byte, valueSize) + } else { + buf = buf[:valueSize] + } + // Extended encoding. Wrap the roachpb.Value encoding with a header containing + // MVCC-level metadata. Requires a copy. + // 4-byte-header-len + binary.BigEndian.PutUint32(buf, uint32(headerLen)) + // 1-byte-sentinel + buf[tagPos] = extendedEncodingSentinel + // mvcc-header + // + // NOTE: we don't use protoutil to avoid passing v.MVCCValueHeader through + // an interface, which would cause a heap allocation and incur the cost of + // dynamic dispatch. + if _, err := v.MVCCValueHeader.MarshalToSizedBuffer(buf[extendedPreludeSize:headerSize]); err != nil { + return nil, errors.Wrap(err, "marshaling MVCCValueHeader") + } + // <4-byte-checksum><1-byte-tag> or empty for tombstone + copy(buf[headerSize:], v.Value.RawBytes) + return buf, nil +} + // DecodeMVCCValue decodes an MVCCKey from its Pebble representation. // // NOTE: this function does not inline, so it is not suitable for performance diff --git a/pkg/storage/mvcc_value_test.go b/pkg/storage/mvcc_value_test.go index b5bfd6e4151a..9ba0c00e1250 100644 --- a/pkg/storage/mvcc_value_test.go +++ b/pkg/storage/mvcc_value_test.go @@ -105,7 +105,7 @@ func TestEncodeMVCCValueForExport(t *testing.T) { } for name, tc := range testcases { t.Run(name, func(t *testing.T) { - encodedVal, err := EncodeMVCCValueForExport(tc.val) + encodedVal, err := EncodeMVCCValueForExport(tc.val, nil) require.NoError(t, err) strippedMVCCVal, err := DecodeMVCCValue(encodedVal) require.NoError(t, err) @@ -316,13 +316,37 @@ func BenchmarkEncodeMVCCValueForExport(b *testing.B) { for vDesc, v := range values { name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + var buf []byte b.Run(name, func(b *testing.B) { for i := 0; i < b.N; i++ { - res, err := EncodeMVCCValueForExport(mvccValue) + var err error + buf, err = EncodeMVCCValueForExport(mvccValue, buf[:0]) if err != nil { // for performance require.NoError(b, err) } - _ = res + _ = buf + } + }) + } + } +} + +func BenchmarkEncodeMVCCValueWithAllocator(b *testing.B) { + DisableMetamorphicSimpleValueEncoding(b) + headers, values := mvccValueBenchmarkConfigs() + for hDesc, h := range headers { + for vDesc, v := range values { + name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc) + mvccValue := MVCCValue{MVCCValueHeader: h, Value: v} + var buf []byte + b.Run(name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + var err error + buf, err = EncodeMVCCValueToBuf(mvccValue, buf[:0]) + if err != nil { // for performance + require.NoError(b, err) + } + _ = buf } }) } From e817d2905ff3d6df426ca0536ac307bd5f8d4c1b Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 12 Mar 2024 21:42:30 +0000 Subject: [PATCH 2/2] storage: implement EncodeMVCCValue in terms of EncodeMVCCValueToBuf Epic: none Release note: None --- pkg/storage/mvcc_value.go | 43 ++++++--------------------------------- 1 file changed, 6 insertions(+), 37 deletions(-) diff --git a/pkg/storage/mvcc_value.go b/pkg/storage/mvcc_value.go index 06cec556fc3f..e3a8548dad48 100644 --- a/pkg/storage/mvcc_value.go +++ b/pkg/storage/mvcc_value.go @@ -177,43 +177,8 @@ func encodedMVCCValueSize(v MVCCValue) int { // EncodeMVCCValue encodes an MVCCValue into its Pebble representation. See the // comment on MVCCValue for a description of the encoding scheme. -// -// TODO(erikgrinaker): This could mid-stack inline when we compared -// v.MVCCValueHeader == enginepb.MVCCValueHeader{} instead of IsEmpty(), but -// struct comparisons have a significant performance regression in Go 1.19 which -// negates the inlining gain. Reconsider this with Go 1.20. See: -// https://github.com/cockroachdb/cockroach/issues/88818 -// -// TODO(ssd): Dedup with EncodeMVCCValueWithAllocator. func EncodeMVCCValue(v MVCCValue) ([]byte, error) { - if v.MVCCValueHeader.IsEmpty() && !disableSimpleValueEncoding { - // Simple encoding. Use the roachpb.Value encoding directly with no - // modification. No need to re-allocate or copy. - return v.Value.RawBytes, nil - } - - // Extended encoding. Wrap the roachpb.Value encoding with a header containing - // MVCC-level metadata. Requires a re-allocation and copy. - headerLen := v.MVCCValueHeader.Size() - headerSize := extendedPreludeSize + headerLen - valueSize := headerSize + len(v.Value.RawBytes) - - buf := make([]byte, valueSize) - // 4-byte-header-len - binary.BigEndian.PutUint32(buf, uint32(headerLen)) - // 1-byte-sentinel - buf[tagPos] = extendedEncodingSentinel - // mvcc-header - // - // NOTE: we don't use protoutil to avoid passing v.MVCCValueHeader through - // an interface, which would cause a heap allocation and incur the cost of - // dynamic dispatch. - if _, err := v.MVCCValueHeader.MarshalToSizedBuffer(buf[extendedPreludeSize:headerSize]); err != nil { - return nil, errors.Wrap(err, "marshaling MVCCValueHeader") - } - // <4-byte-checksum><1-byte-tag> or empty for tombstone - copy(buf[headerSize:], v.Value.RawBytes) - return buf, nil + return EncodeMVCCValueToBuf(v, nil) } // EncodeMVCCValueToBuf encodes an MVCCValue into its Pebble @@ -224,7 +189,11 @@ func EncodeMVCCValue(v MVCCValue) ([]byte, error) { // it is large enough. If the provided buffer is not large enough a // new buffer is allocated. // -// TODO(ssd): Dedup with EncodeMVCCValue. +// TODO(erikgrinaker): This could mid-stack inline when we compared +// v.MVCCValueHeader == enginepb.MVCCValueHeader{} instead of IsEmpty(), but +// struct comparisons have a significant performance regression in Go 1.19 which +// negates the inlining gain. Reconsider this with Go 1.20. See: +// https://github.com/cockroachdb/cockroach/issues/88818 func EncodeMVCCValueToBuf(v MVCCValue, buf []byte) ([]byte, error) { if v.MVCCValueHeader.IsEmpty() && !disableSimpleValueEncoding { // Simple encoding. Use the roachpb.Value encoding directly with no