Skip to content

Commit

Permalink
code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
stevendanna committed Mar 12, 2024
1 parent 429ac73 commit ae9a188
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 40 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ func backup(
backupManifest.StartTime,
backupManifest.EndTime,
backupManifest.ElidedPrefix,
backupManifest.ClusterVersion.AtLeast(clusterversion.V24_1.Version()),
)
if err != nil {
return roachpb.RowCount{}, 0, err
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
Expand Down Expand Up @@ -482,7 +481,6 @@ func runBackupProcessor(
case spans := <-todo:
for _, span := range spans {
for len(span.span.Key) != 0 {
includeMVCCValueHeader := clusterSettings.Version.IsActive(ctx, clusterversion.V24_1)
splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV)
// If we started splitting already, we must continue until we reach the end
// of split span.
Expand All @@ -496,7 +494,7 @@ func runBackupProcessor(
MVCCFilter: spec.MVCCFilter,
TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV),
SplitMidKey: splitMidKey,
IncludeMVCCValueHeader: includeMVCCValueHeader,
IncludeMVCCValueHeader: spec.IncludeMVCCValueHeader,
}

// If we're doing re-attempts but are not yet in the priority regime,
Expand Down
45 changes: 24 additions & 21 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func distBackupPlanSpecs(
mvccFilter kvpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
elide execinfrapb.ElidePrefix,
includeValueHeader bool,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
Expand Down Expand Up @@ -98,17 +99,18 @@ func distBackupPlanSpecs(
sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec)
for _, partition := range spanPartitions {
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand All @@ -121,16 +123,17 @@ func distBackupPlanSpecs(
// which is not the leaseholder for any of the spans, but is for an
// introduced span.
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
MVCCFilter: mvccFilter,
Encryption: fileEncryption,
PKIDs: pkIDs,
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
IncludeMVCCValueHeader: includeValueHeader,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,12 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry(
log.Infof(ctx, "Put %s -> %s", key.Key, value.PrettyPrint())
}

// NB: Using valueScratch here assumes that
// Using valueScratch here assumes that
// DecodeValueFromMVCCValue, ClearChecksum, and
// InitChecksum don't copy/reallocate the slice they
// were given.
// were given. We expect that value.ClearChecksum and
// value.InitChecksum calls above have modified
// valueScratch.
if err := batcher.AddMVCCKey(ctx, key, valueScratch); err != nil {
return summary, errors.Wrapf(err, "adding to batch: %s -> %s", key, value.PrettyPrint())
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,14 @@ message BackupDataSpec {
optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"];

optional ElidePrefix elide_prefix = 12 [(gogoproto.nullable) = false];
// NEXTID: 13.

// IncludeMVCCValueHeader indicates whether the backup should be
// created with MVCCValueHeaders in the exported data. This should
// only be set on backups starting on cluster version 24.1 or
// greater.
optional bool include_mvcc_value_header = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "IncludeMVCCValueHeader"];

// NEXTID: 14.
}

message RestoreFileSpec {
Expand Down
19 changes: 16 additions & 3 deletions pkg/sql/importer/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,11 @@ func (r *importResumer) prepareTablesForIngestion(
var err error
var newTableDescs []jobspb.ImportDetails_Table
var desc *descpb.TableDescriptor

useImportEpochs := p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V24_1)
for i, table := range details.Tables {
if !table.IsNew {
desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc)
desc, err = prepareExistingTablesForIngestion(ctx, txn, descsCol, table.Desc, useImportEpochs)
if err != nil {
return importDetails, err
}
Expand Down Expand Up @@ -488,7 +490,11 @@ func (r *importResumer) prepareTablesForIngestion(
// prepareExistingTablesForIngestion prepares descriptors for existing tables
// being imported into.
func prepareExistingTablesForIngestion(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, desc *descpb.TableDescriptor,
ctx context.Context,
txn *kv.Txn,
descsCol *descs.Collection,
desc *descpb.TableDescriptor,
useImportEpochs bool,
) (*descpb.TableDescriptor, error) {
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
Expand All @@ -508,7 +514,14 @@ func prepareExistingTablesForIngestion(
// Take the table offline for import.
// TODO(dt): audit everywhere we get table descs (leases or otherwise) to
// ensure that filtering by state handles IMPORTING correctly.
importing.OfflineForImport()

// We only use the new OfflineForImport on 24.1, which bumps
// the ImportEpoch, if we are completely on 24.1.
if useImportEpochs {
importing.OfflineForImport()
} else {
importing.SetOffline(tabledesc.OfflineReasonImporting)
}

// TODO(dt): de-validate all the FKs.
if err := descsCol.WriteDesc(
Expand Down
9 changes: 5 additions & 4 deletions pkg/sql/importer/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
Expand Down Expand Up @@ -392,12 +391,14 @@ func ingestKvs(
true /* isPKAdder */)

var bulkAdderImportEpoch uint32
if flowCtx.Cfg.Settings.Version.IsActive(ctx, clusterversion.V24_1) && len(spec.Tables) == 1 {
for _, v := range spec.Tables {
for _, v := range spec.Tables {
if bulkAdderImportEpoch == 0 {
bulkAdderImportEpoch = v.Desc.ImportEpoch
} else if bulkAdderImportEpoch != v.Desc.ImportEpoch {
return nil, errors.AssertionFailedf("inconsistent import epoch on multi-table import")
}

}

pkIndexAdder, err := flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB.KV(), writeTS, kvserverbase.BulkAdderOptions{
Name: pkAdderName,
DisallowShadowingBelow: writeTS,
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/mvcc_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,17 @@ func (v MVCCValue) SafeFormat(w redact.SafePrinter, _ rune) {
w.Print(v.Value.PrettyPrint())
}

// EncodeMVCCValueForExport strips fields from the MVCCValueHeader that
// should not get exported out of the cluster.
// EncodeMVCCValueForExport encodes fields from the MVCCValueHeader
// that are appropriate for export out of the cluster.
func EncodeMVCCValueForExport(mvccValue MVCCValue) ([]byte, error) {
// Consider a fast path, where only the roachpb.Value gets exported.
// Currently, this only occurs if the value was not imported.
if mvccValue.ImportEpoch == 0 {
return mvccValue.Value.RawBytes, nil
}

// Manually strip off any non-exportable fields, and re-encode the mvcc value.
mvccValue.MVCCValueHeader.LocalTimestamp = hlc.ClockTimestamp{}
// We only export ImportEpoch.
mvccValue.MVCCValueHeader = enginepb.MVCCValueHeader{
ImportEpoch: mvccValue.ImportEpoch,
}
return EncodeMVCCValue(mvccValue)
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/storage/mvcc_value_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,26 @@ func BenchmarkEncodeMVCCValue(b *testing.B) {
}
}

func BenchmarkEncodeMVCCValueForExport(b *testing.B) {
DisableMetamorphicSimpleValueEncoding(b)
headers, values := mvccValueBenchmarkConfigs()
for hDesc, h := range headers {
for vDesc, v := range values {
name := fmt.Sprintf("header=%s/value=%s", hDesc, vDesc)
mvccValue := MVCCValue{MVCCValueHeader: h, Value: v}
b.Run(name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
res, err := EncodeMVCCValueForExport(mvccValue)
if err != nil { // for performance
require.NoError(b, err)
}
_ = res
}
})
}
}
}

func BenchmarkDecodeMVCCValue(b *testing.B) {
headers, values := mvccValueBenchmarkConfigs()
for hDesc, h := range headers {
Expand Down

0 comments on commit ae9a188

Please sign in to comment.