Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add MVCCExportFingerprint method and fingerprintWriter #90848

Merged
merged 2 commits into from
Nov 10, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
storage: add MVCCExportFingerprint method and fingerprintWriter
This change introduces a fingerprintWriter that
hashes every key/timestamp and value for point keys, and
combines their hashes via a XOR into a running aggregate.
Range keys are not fingerprinted but instead written to a pebble SST that is
returned to the caller. This is because range keys do not have a stable,
discrete identity and so it is up to the caller to define a deterministic
fingerprinting scheme across all returned range keys.

The fingerprintWriter is used by `MVCCExportFingerprint` that
exports a fingerprint for point keys in the keyrange
[StartKey, EndKey) over the interval (StartTS, EndTS].
The export logic used by `MVCCExportFingerprint` is the same
that drives `MVCCExportToSST`. The former writes to a fingerprintWriter
while the latter writes to an sstWriter. Currently, this
method only support using an `fnv64` hasher to fingerprint each KV.

This change does not wire `MVCCExportFingerprint` to ExportRequest
command evaluation. This will be done as a followup.

Informs: #89336

Release note: None
adityamaru committed Nov 10, 2022

Unverified

No user is associated with the committer email.
commit b0c957dcc99f8c1ba2b22aa8e8676aeb8e9a585a
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ go_library(
"doc.go",
"engine.go",
"engine_key.go",
"fingerprint_writer.go",
"in_mem.go",
"intent_interleaving_iter.go",
"intent_reader_writer.go",
146 changes: 146 additions & 0 deletions pkg/storage/fingerprint_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage

import (
"context"
"hash"
"io"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/errors"
)

// fingerprintWriter hashes every key/timestamp and value for point keys, and
// combines their hashes via a XOR into a running aggregate.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
//
// The caller must Finish() and Close() the fingerprintWriter to finalize the
// writes to the underlying pebble SST.
type fingerprintWriter struct {
hasher hash.Hash64
timestampBuf []byte

sstWriter *SSTWriter
xorAgg *uintXorAggregate
}

// makeFingerprintWriter creates a new fingerprintWriter.
func makeFingerprintWriter(
ctx context.Context, hasher hash.Hash64, cs *cluster.Settings, f io.Writer,
) fingerprintWriter {
// TODO(adityamaru,dt): Once
// https://github.com/cockroachdb/cockroach/issues/90450 has been addressed we
// should write to a kvBuf instead of a Backup SST writer.
sstWriter := MakeBackupSSTWriter(ctx, cs, f)
return fingerprintWriter{
sstWriter: &sstWriter,
hasher: hasher,
xorAgg: &uintXorAggregate{},
}
}

type uintXorAggregate struct {
sum uint64
}

// add inserts one value into the running xor.
func (a *uintXorAggregate) add(x uint64) {
a.sum = a.sum ^ x
}

// result returns the xor.
func (a *uintXorAggregate) result() uint64 {
return a.sum
}

// Finish finalizes the underlying SSTWriter, and returns the aggregated
// fingerprint for point keys.
func (f *fingerprintWriter) Finish() (uint64, error) {
// If no records were added to the sstable, skip completing it.
if f.sstWriter.DataSize != 0 {
if err := f.sstWriter.Finish(); err != nil {
return 0, err
}
}
return f.xorAgg.result(), nil
}

// Close finishes and frees memory and other resources. Close is idempotent.
func (f *fingerprintWriter) Close() {
if f.sstWriter == nil {
return
}
f.sstWriter.Close()
f.hasher.Reset()
f.xorAgg = nil
f.sstWriter = nil
}

var _ ExportWriter = &fingerprintWriter{}

// PutRawMVCCRangeKey implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCCRangeKey(key MVCCRangeKey, bytes []byte) error {
// We do not fingerprint range keys, instead, we write them to a Pebble SST.
// This is because range keys do not have a stable, discrete identity and so
// it is up to the caller to define a deterministic fingerprinting scheme
// across all returned range keys.
return f.sstWriter.PutRawMVCCRangeKey(key, bytes)
}

// PutRawMVCC implements the Writer interface.
func (f *fingerprintWriter) PutRawMVCC(key MVCCKey, value []byte) error {
defer f.hasher.Reset()

// Hash the key/timestamp and value of the RawMVCC.
if err := f.hash(key.Key); err != nil {
return err
}
f.timestampBuf = EncodeMVCCTimestampToBuf(f.timestampBuf, key.Timestamp)
if err := f.hash(f.timestampBuf); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}
f.xorAgg.add(f.hasher.Sum64())
return nil
}

// PutUnversioned implements the Writer interface.
func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error {
defer f.hasher.Reset()

// Hash the key and value in the absence of a timestamp.
if err := f.hash(key); err != nil {
return err
}
if err := f.hash(value); err != nil {
return err
}

f.xorAgg.add(f.hasher.Sum64())
return nil
}

func (f *fingerprintWriter) hash(data []byte) error {
if _, err := f.hasher.Write(data); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
`"It never returns an error." -- https://golang.org/pkg/hash: %T`, f)
}

return nil
}
65 changes: 61 additions & 4 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
"hash/fnv"
"io"
"math"
"runtime"
@@ -5766,6 +5767,35 @@ func MVCCIsSpanEmpty(
return !valid, nil
}

// MVCCExportFingerprint exports a fingerprint for point keys in the keyrange
// [StartKey, EndKey) over the interval (StartTS, EndTS]. Each key/timestamp and
// value is hashed using a fnv64 hasher, and combined into a running aggregate
// via a XOR. On completion of the export this aggregate is returned as the
// fingerprint.
//
// Range keys are not fingerprinted but instead written to a pebble SST that is
// returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
func MVCCExportFingerprint(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, uint64, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()

hasher := fnv.New64()
fingerprintWriter := makeFingerprintWriter(ctx, hasher, cs, dest)
defer fingerprintWriter.Close()

summary, resumeKey, err := mvccExportToWriter(ctx, reader, opts, &fingerprintWriter)
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, 0, err
}

fingerprint, err := fingerprintWriter.Finish()
return summary, resumeKey, fingerprint, err
}

// MVCCExportToSST exports changes to the keyrange [StartKey, EndKey) over the
// interval (StartTS, EndTS] as a Pebble SST. See mvccExportToWriter for more
// details.
@@ -5791,6 +5821,36 @@ func MVCCExportToSST(
return summary, resumeKey, sstWriter.Finish()
}

// ExportWriter is a trimmed down version of the Writer interface. It contains
// only those methods used during ExportRequest command evaluation.
type ExportWriter interface {
// PutRawMVCCRangeKey writes an MVCC range key with the provided encoded
// MVCCValue. It will replace any overlapping range keys at the given
// timestamp (even partial overlap). Only MVCC range tombstones, i.e. an empty
// value, are currently allowed (other kinds will need additional handling in
// MVCC APIs and elsewhere, e.g. stats and GC). It can be used to avoid
// decoding and immediately re-encoding an MVCCValue, but should generally be
// avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCCRangeKey
// returns.
PutRawMVCCRangeKey(MVCCRangeKey, []byte) error
// PutRawMVCC sets the given key to the encoded MVCCValue. It requires that
// the timestamp is non-empty (see {PutUnversioned,PutIntent} if the timestamp
// is empty). It can be used to avoid decoding and immediately re-encoding an
// MVCCValue, but should generally be avoided due to the lack of type safety.
//
// It is safe to modify the contents of the arguments after PutRawMVCC
// returns.
PutRawMVCC(key MVCCKey, value []byte) error
// PutUnversioned sets the given key to the value provided. It is for use
// with inline metadata (not intents) and other unversioned keys (like
// Range-ID local keys).
//
// It is safe to modify the contents of the arguments after Put returns.
PutUnversioned(key roachpb.Key, value []byte) error
}

// mvccExportToWriter exports changes to the keyrange [StartKey, EndKey) over
// the interval (StartTS, EndTS] to the passed in writer. See MVCCExportOptions
// for options. StartTS may be zero.
@@ -5817,11 +5877,8 @@ func MVCCExportToSST(
// error is returned then the writer's contents are undefined. It is the
// responsibility of the caller to Finish() / Close() the passed in writer.
func mvccExportToWriter(
ctx context.Context, reader Reader, opts MVCCExportOptions, writer Writer,
ctx context.Context, reader Reader, opts MVCCExportOptions, writer ExportWriter,
) (roachpb.BulkOpSummary, MVCCKey, error) {
ctx, span := tracing.ChildSpan(ctx, "storage.mvccExportToWriter")
defer span.Finish()

// If we're not exporting all revisions then we can mask point keys below any
// MVCC range tombstones, since we don't care about them.
var rangeKeyMasking hlc.Timestamp
32 changes: 27 additions & 5 deletions pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ var (
// put_rangekey ts=<int>[,<int>] [localTs=<int>[,<int>]] k=<key> end=<key>
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]]
// scan [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [end=<key>] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]] [max=<max>] [targetbytes=<target>] [allowEmpty]
// export [k=<key>] [end=<key>] [ts=<int>[,<int>]] [kTs=<int>[,<int>]] [startTs=<int>[,<int>]] [maxIntents=<int>] [allRevisions] [targetSize=<int>] [maxSize=<int>] [stopMidKey]
// export [k=<key>] [end=<key>] [ts=<int>[,<int>]] [kTs=<int>[,<int>]] [startTs=<int>[,<int>]] [maxIntents=<int>] [allRevisions] [targetSize=<int>] [maxSize=<int>] [stopMidKey] [fingerprint]
//
// iter_new [k=<key>] [end=<key>] [prefix] [kind=key|keyAndIntents] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [pointSynthesis] [maskBelow=<int>[,<int>]]
// iter_new_incremental [k=<key>] [end=<key>] [startTs=<int>[,<int>]] [endTs=<int>[,<int>]] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [maskBelow=<int>[,<int>]] [intents=error|aggregate|emit]
@@ -1331,22 +1331,44 @@ func cmdExport(e *evalCtx) error {
if e.hasArg("maxSize") {
e.scanArg("maxSize", &opts.MaxSize)
}
var shouldFingerprint bool
if e.hasArg("fingerprint") {
shouldFingerprint = true
}

r := e.newReader()
defer r.Close()

sstFile := &storage.MemFile{}
summary, resume, err := storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err

var summary roachpb.BulkOpSummary
var resume storage.MVCCKey
var fingerprint uint64
var err error
if shouldFingerprint {
summary, resume, fingerprint, err = storage.MVCCExportFingerprint(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err
}
e.results.buf.Printf("export: %s", &summary)
e.results.buf.Print(" fingerprint=true")
} else {
summary, resume, err = storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile)
if err != nil {
return err
}
e.results.buf.Printf("export: %s", &summary)
}

e.results.buf.Printf("export: %s", &summary)
if resume.Key != nil {
e.results.buf.Printf(" resume=%s", resume)
}
e.results.buf.Printf("\n")

if shouldFingerprint {
e.results.buf.Printf("fingerprint: %d\n", fingerprint)
}

iter, err := storage.NewMemSSTIterator(sstFile.Bytes(), false /* verify */, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: keys.MaxKey,
17 changes: 17 additions & 0 deletions pkg/storage/mvcc_key.go
Original file line number Diff line number Diff line change
@@ -258,6 +258,23 @@ func encodeMVCCTimestampSuffixToBuf(buf []byte, ts hlc.Timestamp) []byte {
return buf
}

// EncodeMVCCTimestampToBuf encodes an MVCC timestamp into its Pebble
// representation, excluding the length suffix and sentinel byte, reusing the
// given byte slice if it has sufficient capacity.
func EncodeMVCCTimestampToBuf(buf []byte, ts hlc.Timestamp) []byte {
tsLen := encodedMVCCTimestampLength(ts)
if tsLen == 0 {
return buf[:0]
}
if cap(buf) < tsLen {
buf = make([]byte, tsLen)
} else {
buf = buf[:tsLen]
}
encodeMVCCTimestampToBuf(buf, ts)
return buf
}

// encodeMVCCTimestampToBuf encodes an MVCC timestamp into its Pebble
// representation, excluding the length suffix and sentinel byte. The target
// buffer must have the correct size, and the timestamp must not be empty.
9 changes: 9 additions & 0 deletions pkg/storage/mvcc_key_test.go
Original file line number Diff line number Diff line change
@@ -179,6 +179,8 @@ func TestEncodeDecodeMVCCKeyAndTimestampWithLength(t *testing.T) {
"logical and synthetic": {"foo", hlc.Timestamp{Logical: 65535, Synthetic: true}, "666f6f0000000000000000000000ffff010e"},
"all": {"foo", hlc.Timestamp{WallTime: 1643550788737652545, Logical: 65535, Synthetic: true}, "666f6f0016cf10bc050557410000ffff010e"},
}

buf := []byte{}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {

@@ -234,6 +236,13 @@ func TestEncodeDecodeMVCCKeyAndTimestampWithLength(t *testing.T) {
decodedTS, err = decodeMVCCTimestamp(encodedTS)
require.NoError(t, err)
require.Equal(t, tc.ts, decodedTS)

buf = EncodeMVCCTimestampToBuf(buf, tc.ts)
if expectTS == nil {
require.Empty(t, buf)
} else {
require.Equal(t, expectTS, buf)
}
})
}
}
Loading