Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importer/kv: add ImportEpoch field to MVCCValueHeader and write to it during IMPORT INTO #85138

Merged
merged 2 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func backup(
backupManifest.StartTime,
backupManifest.EndTime,
backupManifest.ElidedPrefix,
backupManifest.ClusterVersion.AtLeast(clusterversion.V24_1.Version()),
)
if err != nil {
return roachpb.RowCount{}, 0, err
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,14 @@ func runBackupProcessor(
if !span.firstKeyTS.IsEmpty() {
splitMidKey = true
}

req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
SplitMidKey: splitMidKey,
RequestHeader: kvpb.RequestHeaderFromSpan(span.span),
ResumeKeyTS: span.firstKeyTS,
StartTime: span.start,
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
SplitMidKey: splitMidKey,
IncludeMVCCValueHeader: spec.IncludeMVCCValueHeader,
}

// If we're doing re-attempts but are not yet in the priority regime,
Expand Down
45 changes: 24 additions & 21 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func distBackupPlanSpecs(
mvccFilter kvpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
elide execinfrapb.ElidePrefix,
includeValueHeader bool,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
Expand Down Expand Up @@ -98,17 +99,18 @@ func distBackupPlanSpecs(
sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec)
for _, partition := range spanPartitions {
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand All @@ -121,16 +123,17 @@ func distBackupPlanSpecs(
// which is not the leaseholder for any of the spans, but is for an
// introduced span.
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/bulk"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -554,7 +553,10 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
return summary, err
}
valueScratch = append(valueScratch[:0], v...)
value := roachpb.Value{RawBytes: valueScratch}
value, err := storage.DecodeValueFromMVCCValue(valueScratch)
if err != nil {
return summary, err
}

key.Key, ok, err = kr.RewriteKey(key.Key, key.Timestamp.WallTime)

Expand All @@ -581,7 +583,14 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
if verbose {
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}
if err := batcher.AddMVCCKey(ctx, key, value.RawBytes); err != nil {

// Using valueScratch here assumes that
// DecodeValueFromMVCCValue, ClearChecksum, and
// InitChecksum don't copy/reallocate the slice they
// were given. We expect that value.ClearChecksum and
// value.InitChecksum calls above have modified
// valueScratch.
if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil {
return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint())
}
}
Expand Down
20 changes: 17 additions & 3 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type BufferingAdder struct {
// name of the BufferingAdder for the purpose of logging only.
name string

// importEpoch specifies the ImportEpoch of the table the BufferingAdder
// is ingesting data as part of an IMPORT INTO job. If specified, the Bulk
// Adder's SSTBatcher will write the import epoch to each versioned value's
// metadata.
importEpoch uint32

bulkMon *mon.BytesMonitor
memAcc mon.BoundAccount

Expand Down Expand Up @@ -96,7 +102,8 @@ func MakeBulkAdder(
}

b := &BufferingAdder{
name: opts.Name,
name: opts.Name,
importEpoch: opts.ImportEpoch,
sink: SSTBatcher{
name: opts.Name,
db: db,
Expand Down Expand Up @@ -303,8 +310,15 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {

for i := range b.curBuf.entries {
mvccKey.Key = b.curBuf.Key(i)
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
if b.importEpoch != 0 {
if err := b.sink.AddMVCCKeyWithImportEpoch(ctx, mvccKey, b.curBuf.Value(i),
b.importEpoch); err != nil {
return err
}
} else {
if err := b.sink.AddMVCCKey(ctx, mvccKey, b.curBuf.Value(i)); err != nil {
return err
}
}
}
if err := b.sink.Flush(ctx); err != nil {
Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,21 @@ func (b *SSTBatcher) SetOnFlush(onFlush func(summary kvpb.BulkOpSummary)) {
b.mu.onFlush = onFlush
}

func (b *SSTBatcher) AddMVCCKeyWithImportEpoch(
ctx context.Context, key storage.MVCCKey, value []byte, importEpoch uint32,
) error {
mvccVal, err := storage.DecodeMVCCValue(value)
if err != nil {
return err
}
mvccVal.MVCCValueHeader.ImportEpoch = importEpoch
encVal, err := storage.EncodeMVCCValue(mvccVal)
if err != nil {
return err
}
return b.AddMVCCKey(ctx, key, encVal)
Comment on lines +335 to +344
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to consider adding a function to decode-modify-encode an MVCCValue (taking a callback), so that we can make use of the faster inlined functions. But we can do that after this PR too.

}

// AddMVCCKey adds a key+timestamp/value pair to the batch (flushing if needed).
// This is only for callers that want to control the timestamp on individual
// keys -- like RESTORE where we want the restored data to look like the backup.
Expand Down Expand Up @@ -389,8 +404,7 @@ func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value
if !b.disallowShadowingBelow.IsEmpty() {
b.updateMVCCStats(key, value)
}

return b.sstWriter.Put(key, value)
return b.sstWriter.PutRawMVCC(key, value)
}

// Reset clears all state in the batcher and prepares it for reuse.
Expand Down
77 changes: 76 additions & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
b, err := bulk.MakeBulkAdder(
ctx, kvDB, mockCache, s.ClusterSettings(), ts, kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
ctx, kvDB, mockCache, s.ClusterSettings(), ts,
kvserverbase.BulkAdderOptions{MaxBufferSize: batchSize}, mem, reqs,
)
require.NoError(t, err)

Expand Down Expand Up @@ -361,3 +362,77 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
})
}
}

var DummyImportEpoch uint32 = 3

stevendanna marked this conversation as resolved.
Show resolved Hide resolved
func TestImportEpochIngestion(t *testing.T) {
defer leaktest.AfterTest(t)()

defer log.Scope(t).Close(t)
ctx := context.Background()

mem := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil)
reqs := limit.MakeConcurrentRequestLimiter("reqs", 1000)
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)

b, err := bulk.MakeTestingSSTBatcher(ctx, kvDB, s.ClusterSettings(),
false, true, mem.MakeConcurrentBoundAccount(), reqs)
require.NoError(t, err)
defer b.Close(ctx)

startKey := storageutils.PointKey("a", 1)
endKey := storageutils.PointKey("b", 1)
value := storageutils.StringValueRaw("myHumbleValue")
mvccValue, err := storage.DecodeMVCCValue(value)
require.NoError(t, err)

require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, startKey, value, DummyImportEpoch))
require.NoError(t, b.AddMVCCKeyWithImportEpoch(ctx, endKey, value, DummyImportEpoch))
require.NoError(t, b.Flush(ctx))

// Check that ingested key contains the dummy job ID
req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{
Key: startKey.Key,
EndKey: endKey.Key,
},
MVCCFilter: kvpb.MVCCFilter_All,
StartTime: hlc.Timestamp{},
IncludeMVCCValueHeader: true,
}

header := kvpb.Header{Timestamp: s.Clock().Now()}
resp, roachErr := kv.SendWrappedWith(ctx,
kvDB.NonTransactionalSender(), header, req)
require.NoError(t, roachErr.GoError())
iterOpts := storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsOnly,
LowerBound: startKey.Key,
UpperBound: endKey.Key,
}

checkedJobId := false
for _, file := range resp.(*kvpb.ExportResponse).Files {
it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, iterOpts)
require.NoError(t, err)
defer it.Close()
for it.SeekGE(storage.NilKey); ; it.Next() {
ok, err := it.Valid()
require.NoError(t, err)
if !ok {
break
}
rawVal, err := it.UnsafeValue()
require.NoError(t, err)
val, err := storage.DecodeMVCCValue(rawVal)
require.NoError(t, err)
require.Equal(t, startKey, it.UnsafeKey())
require.Equal(t, mvccValue.Value, val.Value)
require.Equal(t, DummyImportEpoch, val.ImportEpoch)
require.Equal(t, hlc.ClockTimestamp{}, val.LocalTimestamp)
checkedJobId = true
}
}
require.Equal(t, true, checkedJobId)
}
9 changes: 8 additions & 1 deletion pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1831,7 +1831,14 @@ message ExportRequest {

FingerprintOptions fingerprint_options = 15 [(gogoproto.nullable) = false];

// Next ID: 16
// IncludeMVCCValueHeader controls whether the MVCCValueHeader is
// included in exported bytes. Callers should only set this when all
// readers of the returned SST are prepared to parse full a
// MVCCValue. Even when set, only fields appropriate for export are
// included. See storage.EncodeMVCCValueForExport for details.
bool include_mvcc_value_header = 16 [(gogoproto.customname) = "IncludeMVCCValueHeader"];

// Next ID: 17
}

message FingerprintOptions {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func EvalAddSSTable(
// * Only SST set operations (not explicitly verified).
// * No intents or unversioned values.
// * If sstTimestamp is set, all MVCC timestamps equal it.
// * MVCCValueHeader is empty.
// * The LocalTimestamp in the MVCCValueHeader is empty.
// * Given MVCC stats match the SST contents.
func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.MVCCStats) error {

Expand Down Expand Up @@ -584,8 +584,9 @@ func assertSSTContents(sst []byte, sstTimestamp hlc.Timestamp, stats *enginepb.M
return errors.NewAssertionErrorWithWrappedErrf(err,
"SST contains invalid value for key %s", key)
}
if value.MVCCValueHeader != (enginepb.MVCCValueHeader{}) {
return errors.AssertionFailedf("SST contains non-empty MVCC value header for key %s", key)
if !value.MVCCValueHeader.LocalTimestamp.IsEmpty() {
return errors.AssertionFailedf("SST contains non-empty Local Timestamp in the MVCC value"+
" header for key %s", key)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestEvalAddSSTable(t *testing.T) {
sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")},
expect: kvs{pointKVWithLocalTS("a", 2, 1, "a2")},
expectStatsEst: true,
expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`,
expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/2.000000000,0`,
},
"blind rejects local timestamp on range key under race only": { // unfortunately, for performance
sst: kvs{rangeKVWithLocalTS("a", "d", 2, 1, "")},
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestEvalAddSSTable(t *testing.T) {
sst: kvs{pointKVWithLocalTS("a", 2, 1, "a2")},
expect: kvs{pointKVWithLocalTS("a", 10, 1, "a2")},
expectStatsEst: true,
expectErrRace: `SST contains non-empty MVCC value header for key "a"/2.000000000,0`,
expectErrRace: `SST contains non-empty Local Timestamp in the MVCC value header for key "a"/2.000000000,0`,
},
"SSTTimestampToRequestTimestamp with DisallowConflicts causes estimated stats with range key masking": {
reqTS: 5,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func evalExport(
TargetLockConflictBytes: targetLockConflictBytes,
StopMidKey: args.SplitMidKey,
ScanStats: cArgs.ScanStats,
IncludeMVCCValueHeader: args.IncludeMVCCValueHeader,
}
var summary kvpb.BulkOpSummary
var resumeInfo storage.ExportRequestResumeInfo
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/kvserverbase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ type BulkAdderOptions struct {
// the first buffer to pick split points in the hope it is a representative
// sample of the overall input.
InitialSplitsIfUnordered int

// ImportEpoch specifies the ImportEpoch of the table the BulkAdder
// is ingesting data into as part of an IMPORT INTO job. If specified, the Bulk
// Adder's SSTBatcher will write the import epoch to each versioned value's
// metadata.
//
// Callers should check that the cluster is at or above
// version 24.1 before setting this option.
ImportEpoch uint32
}

// BulkAdderFactory describes a factory function for BulkAdders.
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,14 @@ message BackupDataSpec {
optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"];

optional ElidePrefix elide_prefix = 12 [(gogoproto.nullable) = false];
// NEXTID: 13.

// IncludeMVCCValueHeader indicates whether the backup should be
// created with MVCCValueHeaders in the exported data. This should
// only be set on backups starting on cluster version 24.1 or
// greater.
optional bool include_mvcc_value_header = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IncludeMVCCValueHeader"];

// NEXTID: 14.
}

message RestoreFileSpec {
Expand Down
Loading
Loading