Skip to content

Commit

Permalink
storage: add ClientMeta field to MVCCValueHeader
Browse files Browse the repository at this point in the history
This patch adds the ClientMeta field to an MVCCValue's MVCCValueHeader,
which allows kv clients to write metadata to an MVCCValue. Within the
ClientMeta field, this patch adds the ImportJobId field which IMPORT will
eventually populate and depend on during an IMPORT rollback. Unlike the
MVCCValueHeader.LocalTimestamp field, the ClientMeta field should be exported
to other clusters (e.g. via ExportRequests from BACKUP/RESTORE and streaming).
Consequently, this PR relaxes the invariant that the MVCCValueHeader field must
be stripped in an Export Request and must be empty in an AddSSTable Request.
Now, Export Request only strips the MVCCValueHeader.LocalTimestamp field and
AddSSTable will only require the LocalTimestamp to be empty.

Release note: none
  • Loading branch information
msbutler committed Jul 29, 2022
1 parent 4df883b commit 107909d
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 22 deletions.
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func EvalAddSSTable(
// * Only SST set operations (not explicitly verified).
// * No intents or unversioned values.
// * If sstTimestamp is set, all MVCC timestamps equal it.
// * MVCCValueHeader is empty.
// * The LocalTimestamp in the MVCCValueHeader is empty.
// * Given MVCC stats match the SST contents.
func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.MVCCStats) error {

Expand Down Expand Up @@ -495,8 +495,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M
return errors.NewAssertionErrorWithWrappedErrf(err,
"SST contains invalid value for key %s", key)
}
if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) {
return errors.AssertionFailedf("SST contains non-empty MVCC value header for key %s", key)
if !value.MVCCValueHeader.LocalTimestamp.IsEmpty() {
return errors.AssertionFailedf("SST contains non-empty Local Timestamp in the MVCC value"+
" header for key %s", key)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestEvalAddSSTable(t *testing.T) {
sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")},
expect: kvs{pointKVWithLocalTS("a", 2, 1, "a2")},
expectStatsEst: true,
expectErrRace: `SST contains non-empty MVCC value header for key "a"/0.000000002,0`,
expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/0.000000002,0`,
},
"blind rejects local timestamp on range key under race only": { // unfortunately, for performance
sst: kvs{rangeKVWithLocalTS("a", "d", 2, 1, "")},
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestEvalAddSSTable(t *testing.T) {
sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")},
expect: kvs{pointKVWithLocalTS("a", 10, 1, "a2")},
expectStatsEst: true,
expectErrRace: `SST contains non-empty MVCC value header for key "a"/0.000000002,0`,
expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/0.000000002,0`,
},

// DisallowConflicts
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/enginepb/mvcc3.proto
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ message MVCCValueHeader {
// to stale reads.
util.hlc.Timestamp local_timestamp = 1 [(gogoproto.nullable) = false,
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/util/hlc.ClockTimestamp"];

ClientMeta client_meta = 2 [(gogoproto.nullable) = false];
}

// ClientMeta contains client-defined metadata for a versioned value
message ClientMeta {
option (gogoproto.equal) = true;
// ImportJobID identifies job ID of the import job that imported the versioned value
int64 import_job_id = 1;
}

// MVCCStatsDelta is convertible to MVCCStats, but uses signed variable width
Expand Down
20 changes: 18 additions & 2 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5603,8 +5603,24 @@ func MVCCExportToSST(
return roachpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey)
}

// Export only the inner roachpb.Value, not the MVCCValue header.
unsafeValue = mvccValue.Value.RawBytes
// Export only the inner roachpb.Value, not the MVCCValue header, if the
// point key was not imported (aside: range keys are never imported).
//
// TODO (msbutler): plumb in progress import job IDs to the export
// request, as we only need to preserve the import job ID from in
// progress imports.
if mvccValue.ClientMeta.ImportJobId == 0 {
unsafeValue = mvccValue.Value.RawBytes
} else {
// Preserve the import job ID, but drop the local timestamp.
mvccValue.StripMVCCValueHeaderForExport()
unsafeValue, err = EncodeMVCCValue(mvccValue)
if err != nil {
return roachpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err,
"repackaging imported mvcc value %s",
unsafeKey)
}
}

// Skip tombstone records when start time is zero (non-incremental)
// and we are not exporting all versions.
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/mvcc_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,20 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) {
if !v.LocalTimestamp.IsEmpty() {
w.Printf("localTs=%s", v.LocalTimestamp)
}
if v.ClientMeta != (enginepb.ClientMeta{}) {
w.Printf(", ClientMeta={importJobID=%v}", v.ClientMeta.ImportJobId)
}
w.Printf("}")
}
w.Print(v.Value.PrettyPrint())
}

// StripMVCCValueHeaderForExport strips fields from the MVCCValueHeader that
// should not get exported out of the cluster.
func (v *MVCCValue)StripMVCCValueHeaderForExport() {
v.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{}
}

// When running a metamorphic build, disable the simple MVCC value encoding to
// prevent code from assuming that the MVCCValue encoding is identical to the
// roachpb.Value encoding.
Expand Down Expand Up @@ -283,6 +292,7 @@ func decodeExtendedMVCCValue(buf []byte) (MVCCValue, error) {
}
var v MVCCValue
v.LocalTimestamp = header.LocalTimestamp
v.ClientMeta = header.ClientMeta
v.Value.RawBytes = buf[headerSize:]
return v, nil
}
Expand Down
46 changes: 31 additions & 15 deletions pkg/storage/mvcc_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func TestMVCCValueLocalTimestampNeeded(t *testing.T) {
mvccVal.LocalTimestamp = hlc.ClockTimestamp(tc.localTs)

require.Equal(t, tc.expect, mvccVal.LocalTimestampNeeded(tc.versionTs))
mvccVal.StripMVCCValueHeaderForExport()
require.Equal(t, true, mvccVal.LocalTimestamp.IsEmpty())
})
}
}
Expand Down Expand Up @@ -88,16 +90,22 @@ func TestMVCCValueFormat(t *testing.T) {
valHeader := enginepb.MVCCValueHeader{}
valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9}

valHeaderWithJobId := valHeader
valHeaderWithJobId.ClientMeta = enginepb.ClientMeta{ImportJobId: 999}

testcases := map[string]struct {
val MVCCValue
expect string
}{
"tombstone": {val: MVCCValue{}, expect: "/<empty>"},
"bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"},
"int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"},
"header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/<empty>"},
"header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"},
"header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"},
"tombstone": {val: MVCCValue{}, expect: "/<empty>"},
"bytes": {val: MVCCValue{Value: strVal}, expect: "/BYTES/foo"},
"int": {val: MVCCValue{Value: intVal}, expect: "/INT/17"},
"header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: "{localTs=0.000000009,0}/<empty>"},
"header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: "{localTs=0.000000009,0}/BYTES/foo"},
"header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: "{localTs=0.000000009,0}/INT/17"},
"headerJobID+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobId}, expect: "{localTs=0.000000009,0, ClientMeta={importJobID=999}}/<empty>"},
"headerJobID+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobId, Value: strVal}, expect: "{localTs=0.000000009,0, ClientMeta={importJobID=999}}/BYTES/foo"},
"headerJobID+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobId, Value: intVal}, expect: "{localTs=0.000000009,0, ClientMeta={importJobID=999}}/INT/17"},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
Expand All @@ -117,16 +125,22 @@ func TestEncodeDecodeMVCCValue(t *testing.T) {
valHeader := enginepb.MVCCValueHeader{}
valHeader.LocalTimestamp = hlc.ClockTimestamp{WallTime: 9}

valHeaderWithJobId := valHeader
valHeaderWithJobId.ClientMeta = enginepb.ClientMeta{ImportJobId: 999}

testcases := map[string]struct {
val MVCCValue
expect []byte
}{
"tombstone": {val: MVCCValue{}, expect: nil},
"bytes": {val: MVCCValue{Value: strVal}, expect: []byte{0x0, 0x0, 0x0, 0x0, 0x3, 0x66, 0x6f, 0x6f}},
"int": {val: MVCCValue{Value: intVal}, expect: []byte{0x0, 0x0, 0x0, 0x0, 0x1, 0x22}},
"header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: []byte{0x0, 0x0, 0x0, 0x4, 0x65, 0xa, 0x2, 0x8, 0x9}},
"header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: []byte{0x0, 0x0, 0x0, 0x4, 0x65, 0xa, 0x2, 0x8, 0x9, 0x0, 0x0, 0x0, 0x0, 0x3, 0x66, 0x6f, 0x6f}},
"header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: []byte{0x0, 0x0, 0x0, 0x4, 0x65, 0xa, 0x2, 0x8, 0x9, 0x0, 0x0, 0x0, 0x0, 0x1, 0x22}},
"tombstone": {val: MVCCValue{}, expect: nil},
"bytes": {val: MVCCValue{Value: strVal}, expect: []byte{0x0, 0x0, 0x0, 0x0, 0x3, 0x66, 0x6f, 0x6f}},
"int": {val: MVCCValue{Value: intVal}, expect: []byte{0x0, 0x0, 0x0, 0x0, 0x1, 0x22}},
"header+tombstone": {val: MVCCValue{MVCCValueHeader: valHeader}, expect: []byte{0x0, 0x0, 0x0, 0x6, 0x65, 0xa, 0x2, 0x8, 0x9, 0x12, 0x0}},
"header+bytes": {val: MVCCValue{MVCCValueHeader: valHeader, Value: strVal}, expect: []byte{0x0, 0x0, 0x0, 0x6, 0x65, 0xa, 0x2, 0x8, 0x9, 0x12, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x66, 0x6f, 0x6f}},
"header+int": {val: MVCCValue{MVCCValueHeader: valHeader, Value: intVal}, expect: []byte{0x0, 0x0, 0x0, 0x6, 0x65, 0xa, 0x2, 0x8, 0x9, 0x12, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x22}},
"headerJobID+tombstone": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobId}, expect: []byte{0x0, 0x0, 0x0, 0x9, 0x65, 0xa, 0x2, 0x8, 0x9, 0x12, 0x3, 0x8, 0xe7, 0x7}},
"headerJobID+bytes": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobId, Value: strVal}, expect: []byte{0x0, 0x0, 0x0, 0x9, 0x65, 0xa, 0x2, 0x8, 0x9, 0x12, 0x3, 0x8, 0xe7, 0x7, 0x0, 0x0, 0x0, 0x0, 0x3, 0x66, 0x6f, 0x6f}},
"headerJobID+int": {val: MVCCValue{MVCCValueHeader: valHeaderWithJobId, Value: intVal}, expect: []byte{0x0, 0x0, 0x0, 0x9, 0x65, 0xa, 0x2, 0x8, 0x9, 0x12, 0x3, 0x8, 0xe7, 0x7, 0x0, 0x0, 0x0, 0x0, 0x1, 0x22}},
}
for name, tc := range testcases {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -171,9 +185,11 @@ var mvccValueBenchmarkConfigs = struct {
values map[string]roachpb.Value
}{
headers: map[string]enginepb.MVCCValueHeader{
"empty": {},
"local walltime": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}},
"local walltime+logical": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}},
"empty": {},
"local walltime": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}},
"local walltime+logical": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}},
"local walltime+jobID": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545}, ClientMeta: enginepb.ClientMeta{ImportJobId: 4096}},
"local walltime+logical+jobID": {LocalTimestamp: hlc.ClockTimestamp{WallTime: 1643550788737652545, Logical: 4096}, ClientMeta: enginepb.ClientMeta{ImportJobId: 4096}},
},
values: map[string]roachpb.Value{
"tombstone": {},
Expand Down

0 comments on commit 107909d

Please sign in to comment.