Skip to content

Commit

Permalink
Merge #119673
Browse files Browse the repository at this point in the history
119673: backup,restore: support prefix-free backups r=msbutler a=dt

This extends 24.1 backups to be produced with the tenant ID and table ID prefix on every key stored in the backup elided from the stored keys. The elided prefix can be derived from the prefix of the span being restored so it can be elided from the individual keys with no loss of information.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Mar 3, 2024
2 parents 12c4be9 + 9317ca7 commit 660a3c8
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 64 deletions.
18 changes: 18 additions & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ var useBulkOracle = settings.RegisterBoolSetting(
"randomize the selection of which replica backs up each range",
true)

var elidePrefixes = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"bulkio.backup.elide_common_prefix.enabled",
"remove common prefixes from backup file",
true)

func countRows(raw kvpb.BulkOpSummary, pkIDs map[uint64]bool) roachpb.RowCount {
res := roachpb.RowCount{DataSize: raw.DataSize}
for id, count := range raw.EntryCounts {
Expand Down Expand Up @@ -237,6 +243,7 @@ func backup(
kvpb.MVCCFilter(backupManifest.MVCCFilter),
backupManifest.StartTime,
backupManifest.EndTime,
backupManifest.ElidedPrefix,
)
if err != nil {
return roachpb.RowCount{}, 0, err
Expand Down Expand Up @@ -1620,6 +1627,16 @@ func createBackupManifest(
if jobDetails.FullCluster {
coverage = tree.AllDescriptors
}
elide := execinfrapb.ElidePrefix_None
if len(prevBackups) > 0 {
elide = prevBackups[0].ElidedPrefix
} else if execCfg.Settings.Version.IsActive(ctx, clusterversion.V24_1) && elidePrefixes.Get(&execCfg.Settings.SV) {
if len(tenants) > 0 {
elide = execinfrapb.ElidePrefix_Tenant
} else {
elide = execinfrapb.ElidePrefix_TenantAndTable
}
}

backupManifest := backuppb.BackupManifest{
StartTime: startTime,
Expand All @@ -1637,6 +1654,7 @@ func createBackupManifest(
ClusterID: execCfg.NodeInfo.LogicalClusterID(),
StatisticsFilenames: statsFiles,
DescriptorCoverage: coverage,
ElidedPrefix: elide,
}
if err := checkCoverage(ctx, backupManifest.Spans, append(prevBackups, backupManifest)); err != nil {
return backuppb.BackupManifest{}, errors.Wrap(err, "new backup would not cover expected time")
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ func runBackupProcessor(
logClose(ctx, sink, "SST sink")
}()

sink.elideMode = spec.ElidePrefix

// priority becomes true when we're sending re-attempts of reads far enough
// in the past that we want to run them with priority.
var priority bool
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func distBackupPlanSpecs(
kmsEnv cloud.KMSEnv,
mvccFilter kvpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
elide execinfrapb.ElidePrefix,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
Expand Down Expand Up @@ -107,6 +108,7 @@ func distBackupPlanSpecs(
BackupStartTime: startTime,
BackupEndTime: endTime,
UserProto: user.EncodeProto(),
ElidePrefix: elide,
}
sqlInstanceIDToSpec[partition.SQLInstanceID] = spec
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backuppb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_proto_library(
"//pkg/multitenant/mtinfopb",
"//pkg/roachpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/execinfrapb", # keep
"//pkg/sql/stats",
"//pkg/util/hlc",
"@com_github_gogo_protobuf//gogoproto",
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ message BackupManifest {
// since all backups in 23.1+ will write slim manifests.
bool has_external_manifest_ssts = 27 [(gogoproto.customname) = "HasExternalManifestSSTs"];

// NEXT ID: 28
int32 elided_prefix = 28 [(gogoproto.nullable) = false,
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb.ElidePrefix"];

// NEXT ID: 29.
}

message BackupPartitionDescriptor{
Expand Down
50 changes: 47 additions & 3 deletions pkg/ccl/backupccl/file_sst_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package backupccl

import (
"bytes"
"context"
"fmt"
io "io"
Expand Down Expand Up @@ -61,6 +62,9 @@ type fileSSTSink struct {
// flush. This counter resets on each flush.
completedSpans int32

elideMode execinfrapb.ElidePrefix
elidePrefix roachpb.Key

// stats contain statistics about the actions of the fileSSTSink over its
// entire lifespan.
stats struct {
Expand Down Expand Up @@ -163,6 +167,7 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error {
}

s.flushedFiles = nil
s.elidePrefix = s.elidePrefix[:0]
s.flushedSize = 0
s.flushedRevStart.Reset()
s.completedSpans = 0
Expand Down Expand Up @@ -208,11 +213,16 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error {

span := resp.metadata.Span

spanPrefix, err := elidedPrefix(span.Key, s.elideMode)
if err != nil {
return err
}

// If this span starts before the last buffered span ended, we need to flush
// since it overlaps but SSTWriter demands writes in-order.
if len(s.flushedFiles) > 0 {
last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey
if span.Key.Compare(last) < 0 {
if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) {
log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s",
s.outName, s.flushedSize, span, last,
)
Expand All @@ -229,6 +239,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error {
return err
}
}
s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...)

log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName)

Expand Down Expand Up @@ -299,6 +310,12 @@ func (s *fileSSTSink) copyPointKeys(dataSST []byte) error {
break
}
k := iter.UnsafeKey()
suffix, ok := bytes.CutPrefix(k.Key, s.elidePrefix)
if !ok {
return errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidePrefix)
}
k.Key = suffix

v, err := iter.UnsafeValue()
if err != nil {
return err
Expand All @@ -308,7 +325,7 @@ func (s *fileSSTSink) copyPointKeys(dataSST []byte) error {
return err
}
} else {
if err := s.sst.PutRawMVCC(iter.UnsafeKey(), v); err != nil {
if err := s.sst.PutRawMVCC(k, v); err != nil {
return err
}
}
Expand Down Expand Up @@ -336,7 +353,15 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) error {
}
rangeKeys := iter.RangeKeys()
for _, v := range rangeKeys.Versions {
if err := s.sst.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), v.Value); err != nil {
rk := rangeKeys.AsRangeKey(v)
var ok bool
if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidePrefix); !ok {
return errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidePrefix)
}
if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidePrefix); !ok {
return errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidePrefix)
}
if err := s.sst.PutRawMVCCRangeKey(rk, v.Value); err != nil {
return err
}
}
Expand All @@ -350,3 +375,22 @@ func generateUniqueSSTName(nodeID base.SQLInstanceID) string {
return fmt.Sprintf("data/%d.sst",
builtins.GenerateUniqueInt(builtins.ProcessUniqueID(nodeID)))
}

func elidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error) {
switch mode {
case execinfrapb.ElidePrefix_TenantAndTable:
rest, err := keys.StripTablePrefix(key)
if err != nil {
return nil, err
}
return key[: len(key)-len(rest) : len(key)-len(rest)], nil

case execinfrapb.ElidePrefix_Tenant:
rest, err := keys.StripTenantPrefix(key)
if err != nil {
return nil, err
}
return key[: len(key)-len(rest) : len(key)-len(rest)], nil
}
return nil, nil
}
Loading

0 comments on commit 660a3c8

Please sign in to comment.