diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index ca813d4c1909..bb20648c59db 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -95,6 +95,12 @@ var useBulkOracle = settings.RegisterBoolSetting( "randomize the selection of which replica backs up each range", true) +var elidePrefixes = settings.RegisterBoolSetting( + settings.ApplicationLevel, + "bulkio.backup.elide_common_prefix.enabled", + "remove common prefixes from backup file", + true) + func countRows(raw kvpb.BulkOpSummary, pkIDs map[uint64]bool) roachpb.RowCount { res := roachpb.RowCount{DataSize: raw.DataSize} for id, count := range raw.EntryCounts { @@ -237,6 +243,7 @@ func backup( kvpb.MVCCFilter(backupManifest.MVCCFilter), backupManifest.StartTime, backupManifest.EndTime, + backupManifest.ElidedPrefix, ) if err != nil { return roachpb.RowCount{}, 0, err @@ -1620,6 +1627,16 @@ func createBackupManifest( if jobDetails.FullCluster { coverage = tree.AllDescriptors } + elide := execinfrapb.ElidePrefix_None + if len(prevBackups) > 0 { + elide = prevBackups[0].ElidedPrefix + } else if execCfg.Settings.Version.IsActive(ctx, clusterversion.V24_1) && elidePrefixes.Get(&execCfg.Settings.SV) { + if len(tenants) > 0 { + elide = execinfrapb.ElidePrefix_Tenant + } else { + elide = execinfrapb.ElidePrefix_TenantAndTable + } + } backupManifest := backuppb.BackupManifest{ StartTime: startTime, @@ -1637,6 +1654,7 @@ func createBackupManifest( ClusterID: execCfg.NodeInfo.LogicalClusterID(), StatisticsFilenames: statsFiles, DescriptorCoverage: coverage, + ElidedPrefix: elide, } if err := checkCoverage(ctx, backupManifest.Spans, append(prevBackups, backupManifest)); err != nil { return backuppb.BackupManifest{}, errors.Wrap(err, "new backup would not cover expected time") diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index f3ab26b9afb6..e636ff99d309 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -466,6 +466,8 @@ func runBackupProcessor( logClose(ctx, sink, "SST sink") }() + sink.elideMode = spec.ElidePrefix + // priority becomes true when we're sending re-attempts of reads far enough // in the past that we want to run them with priority. var priority bool diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 71ea019d579d..9edcccb37f59 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -45,6 +45,7 @@ func distBackupPlanSpecs( kmsEnv cloud.KMSEnv, mvccFilter kvpb.MVCCFilter, startTime, endTime hlc.Timestamp, + elide execinfrapb.ElidePrefix, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") @@ -107,6 +108,7 @@ func distBackupPlanSpecs( BackupStartTime: startTime, BackupEndTime: endTime, UserProto: user.EncodeProto(), + ElidePrefix: elide, } sqlInstanceIDToSpec[partition.SQLInstanceID] = spec } diff --git a/pkg/ccl/backupccl/backuppb/BUILD.bazel b/pkg/ccl/backupccl/backuppb/BUILD.bazel index 1c59699256fb..f833f57eae83 100644 --- a/pkg/ccl/backupccl/backuppb/BUILD.bazel +++ b/pkg/ccl/backupccl/backuppb/BUILD.bazel @@ -31,6 +31,7 @@ go_proto_library( "//pkg/multitenant/mtinfopb", "//pkg/roachpb", "//pkg/sql/catalog/descpb", + "//pkg/sql/execinfrapb", # keep "//pkg/sql/stats", "//pkg/util/hlc", "@com_github_gogo_protobuf//gogoproto", diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index 2b7b3057017b..35d5525711f7 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -149,7 +149,10 @@ message BackupManifest { // since all backups in 23.1+ will write slim manifests. bool has_external_manifest_ssts = 27 [(gogoproto.customname) = "HasExternalManifestSSTs"]; - // NEXT ID: 28 + int32 elided_prefix = 28 [(gogoproto.nullable) = false, + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb.ElidePrefix"]; + + // NEXT ID: 29. } message BackupPartitionDescriptor{ diff --git a/pkg/ccl/backupccl/file_sst_sink.go b/pkg/ccl/backupccl/file_sst_sink.go index 6f419c394b2e..b494b40ae880 100644 --- a/pkg/ccl/backupccl/file_sst_sink.go +++ b/pkg/ccl/backupccl/file_sst_sink.go @@ -9,6 +9,7 @@ package backupccl import ( + "bytes" "context" "fmt" io "io" @@ -61,6 +62,9 @@ type fileSSTSink struct { // flush. This counter resets on each flush. completedSpans int32 + elideMode execinfrapb.ElidePrefix + elidePrefix roachpb.Key + // stats contain statistics about the actions of the fileSSTSink over its // entire lifespan. stats struct { @@ -163,6 +167,7 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error { } s.flushedFiles = nil + s.elidePrefix = s.elidePrefix[:0] s.flushedSize = 0 s.flushedRevStart.Reset() s.completedSpans = 0 @@ -208,11 +213,16 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error { span := resp.metadata.Span + spanPrefix, err := elidedPrefix(span.Key, s.elideMode) + if err != nil { + return err + } + // If this span starts before the last buffered span ended, we need to flush // since it overlaps but SSTWriter demands writes in-order. if len(s.flushedFiles) > 0 { last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey - if span.Key.Compare(last) < 0 { + if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) { log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s", s.outName, s.flushedSize, span, last, ) @@ -229,6 +239,7 @@ func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) error { return err } } + s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...) log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName) @@ -299,6 +310,12 @@ func (s *fileSSTSink) copyPointKeys(dataSST []byte) error { break } k := iter.UnsafeKey() + suffix, ok := bytes.CutPrefix(k.Key, s.elidePrefix) + if !ok { + return errors.AssertionFailedf("prefix mismatch %q does not have %q", k.Key, s.elidePrefix) + } + k.Key = suffix + v, err := iter.UnsafeValue() if err != nil { return err @@ -308,7 +325,7 @@ func (s *fileSSTSink) copyPointKeys(dataSST []byte) error { return err } } else { - if err := s.sst.PutRawMVCC(iter.UnsafeKey(), v); err != nil { + if err := s.sst.PutRawMVCC(k, v); err != nil { return err } } @@ -336,7 +353,15 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) error { } rangeKeys := iter.RangeKeys() for _, v := range rangeKeys.Versions { - if err := s.sst.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), v.Value); err != nil { + rk := rangeKeys.AsRangeKey(v) + var ok bool + if rk.StartKey, ok = bytes.CutPrefix(rk.StartKey, s.elidePrefix); !ok { + return errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.StartKey, s.elidePrefix) + } + if rk.EndKey, ok = bytes.CutPrefix(rk.EndKey, s.elidePrefix); !ok { + return errors.AssertionFailedf("prefix mismatch %q does not have %q", rk.EndKey, s.elidePrefix) + } + if err := s.sst.PutRawMVCCRangeKey(rk, v.Value); err != nil { return err } } @@ -350,3 +375,22 @@ func generateUniqueSSTName(nodeID base.SQLInstanceID) string { return fmt.Sprintf("data/%d.sst", builtins.GenerateUniqueInt(builtins.ProcessUniqueID(nodeID))) } + +func elidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error) { + switch mode { + case execinfrapb.ElidePrefix_TenantAndTable: + rest, err := keys.StripTablePrefix(key) + if err != nil { + return nil, err + } + return key[: len(key)-len(rest) : len(key)-len(rest)], nil + + case execinfrapb.ElidePrefix_Tenant: + rest, err := keys.StripTenantPrefix(key) + if err != nil { + return nil, err + } + return key[: len(key)-len(rest) : len(key)-len(rest)], nil + } + return nil, nil +} diff --git a/pkg/ccl/backupccl/file_sst_sink_test.go b/pkg/ccl/backupccl/file_sst_sink_test.go index e272ab18ca5a..cc24aa78ed51 100644 --- a/pkg/ccl/backupccl/file_sst_sink_test.go +++ b/pkg/ccl/backupccl/file_sst_sink_test.go @@ -13,6 +13,8 @@ import ( "context" "fmt" "reflect" + "strconv" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" @@ -130,10 +132,11 @@ func TestFileSSTSinkWrite(t *testing.T) { ctx := context.Background() type testCase struct { - name string - exportSpans []exportedSpan - flushedSpans []roachpb.Spans - unflushedSpans []roachpb.Spans + name string + exportSpans []exportedSpan + flushedSpans []roachpb.Spans + elideFlushedSpans []roachpb.Spans + unflushedSpans []roachpb.Spans // errorExplanation, if non-empty, explains why an error is expected when // writing the case inputs, and makes the test case fail if none is hit. errorExplanation string @@ -166,6 +169,23 @@ func TestFileSSTSinkWrite(t *testing.T) { unflushedSpans: []roachpb.Spans{{roachpb.Span{Key: []byte("c"), EndKey: []byte("e")}}}, errorExplanation: "unsupported write ordering; backup processor should not do this due to one sink per worker and #118990.", }, + { + name: "prefix-differ", + exportSpans: []exportedSpan{ + newExportedSpanBuilder("2/a", "2/c", false).withKVs([]kvAndTS{{key: "2/a", timestamp: 10}, {key: "2/c", timestamp: 10}}).build(), + newExportedSpanBuilder("2/c", "2/d", true).withKVs([]kvAndTS{{key: "2/c", timestamp: 9}, {key: "2/d", timestamp: 10}}).build(), + newExportedSpanBuilder("3/c", "3/e", true).withKVs([]kvAndTS{{key: "3/c", timestamp: 9}, {key: "3/d", timestamp: 10}}).build(), + newExportedSpanBuilder("2/e", "2/g", true).withKVs([]kvAndTS{{key: "2/e", timestamp: 10}, {key: "2/f", timestamp: 10}}).build(), + }, + flushedSpans: []roachpb.Spans{ + {roachpb.Span{Key: []byte("2/a"), EndKey: []byte("2/d")}, roachpb.Span{Key: []byte("3/c"), EndKey: []byte("3/e")}}, + }, + elideFlushedSpans: []roachpb.Spans{ + {roachpb.Span{Key: []byte("2/a"), EndKey: []byte("2/d")}}, + {roachpb.Span{Key: []byte("3/c"), EndKey: []byte("3/e")}}, + }, + unflushedSpans: []roachpb.Spans{{roachpb.Span{Key: []byte("2/e"), EndKey: []byte("2/g")}}}, + }, { name: "extend-key-boundary-1-file", exportSpans: []exportedSpan{ @@ -271,57 +291,92 @@ func TestFileSSTSinkWrite(t *testing.T) { }, }, } { - t.Run(tt.name, func(t *testing.T) { - if tt.errorExplanation != "" { - return + for i := range tt.flushedSpans { + for j, sp := range tt.flushedSpans[i] { + tt.flushedSpans[i][j].Key = s2k(string(sp.Key)) + tt.flushedSpans[i][j].EndKey = s2k(string(sp.EndKey)) } - st := cluster.MakeTestingClusterSettings() - targetFileSize.Override(ctx, &st.SV, 10<<10) - - sink, store := fileSSTSinkTestSetUp(ctx, t, st) - defer func() { - require.NoError(t, sink.Close()) - }() - - for _, es := range tt.exportSpans { - require.NoError(t, sink.write(ctx, es)) + } + for i := range tt.elideFlushedSpans { + for j, sp := range tt.elideFlushedSpans[i] { + tt.elideFlushedSpans[i][j].Key = s2k(string(sp.Key)) + tt.elideFlushedSpans[i][j].EndKey = s2k(string(sp.EndKey)) + } + } + for i := range tt.unflushedSpans { + for j, sp := range tt.unflushedSpans[i] { + tt.unflushedSpans[i][j].Key = s2k(string(sp.Key)) + tt.unflushedSpans[i][j].EndKey = s2k(string(sp.EndKey)) } + } - progress := make([]backuppb.BackupManifest_File, 0) + for _, elide := range []execinfrapb.ElidePrefix{execinfrapb.ElidePrefix_None, execinfrapb.ElidePrefix_TenantAndTable} { + t.Run(fmt.Sprintf("%s/elide=%s", tt.name, elide), func(t *testing.T) { + if tt.errorExplanation != "" { + return + } + st := cluster.MakeTestingClusterSettings() + targetFileSize.Override(ctx, &st.SV, 10<<10) - Loop: - for { - select { - case p := <-sink.conf.progCh: - var progDetails backuppb.BackupManifest_Progress - if err := types.UnmarshalAny(&p.ProgressDetails, &progDetails); err != nil { - t.Fatal(err) - } + sink, store := fileSSTSinkTestSetUp(ctx, t, st) + defer func() { + require.NoError(t, sink.Close()) + }() + sink.elideMode = elide - progress = append(progress, progDetails.Files...) - default: - break Loop + for _, es := range tt.exportSpans { + require.NoError(t, sink.write(ctx, es)) } - } - // progCh contains the files that have already been created with - // flushes. Verify the contents. - require.NoError(t, checkFiles(ctx, store, progress, tt.flushedSpans)) - - // flushedFiles contain the files that are in queue to be created on the - // next flush. Save these and then flush the sink to check their contents. - var actualUnflushedFiles []backuppb.BackupManifest_File - actualUnflushedFiles = append(actualUnflushedFiles, sink.flushedFiles...) - // We cannot end the test -- by calling flush -- if the sink is mid-key. - if len(tt.exportSpans) > 0 && !tt.exportSpans[len(tt.exportSpans)-1].atKeyBoundary { - sink.writeWithNoData(newExportedSpanBuilder("z", "zz", true).build()) - } - require.NoError(t, sink.flush(ctx)) - require.NoError(t, checkFiles(ctx, store, actualUnflushedFiles, tt.unflushedSpans)) - require.Empty(t, sink.flushedFiles) - }) + progress := make([]backuppb.BackupManifest_File, 0) + + Loop: + for { + select { + case p := <-sink.conf.progCh: + var progDetails backuppb.BackupManifest_Progress + if err := types.UnmarshalAny(&p.ProgressDetails, &progDetails); err != nil { + t.Fatal(err) + } + + progress = append(progress, progDetails.Files...) + default: + break Loop + } + } + expectedSpans := tt.flushedSpans + eliding := sink.elideMode != execinfrapb.ElidePrefix_None + if eliding && len(tt.elideFlushedSpans) > 0 { + expectedSpans = tt.elideFlushedSpans + } + // progCh contains the files that have already been created with + // flushes. Verify the contents. + require.NoError(t, checkFiles(ctx, store, progress, expectedSpans, eliding)) + + // flushedFiles contain the files that are in queue to be created on the + // next flush. Save these and then flush the sink to check their contents. + var actualUnflushedFiles []backuppb.BackupManifest_File + actualUnflushedFiles = append(actualUnflushedFiles, sink.flushedFiles...) + // We cannot end the test -- by calling flush -- if the sink is mid-key. + if len(tt.exportSpans) > 0 && !tt.exportSpans[len(tt.exportSpans)-1].atKeyBoundary { + sink.writeWithNoData(newExportedSpanBuilder("z", "zz", true).build()) + } + require.NoError(t, sink.flush(ctx)) + require.NoError(t, checkFiles(ctx, store, actualUnflushedFiles, tt.unflushedSpans, eliding)) + require.Empty(t, sink.flushedFiles) + }) + } } +} +func s2k(s string) roachpb.Key { + tbl := 1 + k := []byte(s) + if p := strings.Split(s, "/"); len(p) > 1 { + tbl, _ = strconv.Atoi(p[0]) + k = []byte(p[1]) + } + return append(keys.SystemSQLCodec.IndexPrefix(uint32(tbl), 2), k...) } // TestFileSSTSinkStats tests the internal counters and stats of the FileSSTSink under @@ -530,7 +585,11 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) { var expected []kvAndTS for _, input := range tt.inputs { + for i := range input.input { + input.input[i].key = string(s2k(input.input[i].key)) + } expected = append(expected, input.input...) + } iterOpts := storage.IterOptions{ @@ -668,7 +727,7 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) { // Add some point key values in the input as well. es := newExportedSpanBuilder(rangeKeys[0].key, rangeKeys[len(rangeKeys)-1].key, false). withRangeKeys(rangeKeys). - withKVs([]kvAndTS{{rangeKeys[0].key, nil, rangeKeys[0].timestamp}}). + withKVs([]kvAndTS{{key: rangeKeys[0].key, timestamp: rangeKeys[0].timestamp}}). build() err := sink.copyRangeKeys(es.dataSST) if input.expectErr != "" { @@ -692,7 +751,7 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) { if expected[rk.timestamp] == nil { expected[rk.timestamp] = &roachpb.SpanGroup{} } - + t.Logf("%d: Adding %v", rk.timestamp, rk.span()) expected[rk.timestamp].Add(rk.span()) } } @@ -761,8 +820,8 @@ type rangeKeyAndTS struct { func (rk rangeKeyAndTS) span() roachpb.Span { return roachpb.Span{ - Key: []byte(rk.key), - EndKey: []byte(rk.endKey), + Key: s2k(rk.key), + EndKey: s2k(rk.endKey), } } @@ -777,8 +836,8 @@ func newExportedSpanBuilder(spanStart, spanEnd string, atKeyBoundary bool) *expo es: &exportedSpan{ metadata: backuppb.BackupManifest_File{ Span: roachpb.Span{ - Key: []byte(spanStart), - EndKey: []byte(spanEnd), + Key: s2k(spanStart), + EndKey: s2k(spanEnd), }, EntryCounts: roachpb.RowCount{ DataSize: 1, @@ -825,7 +884,7 @@ func (b *exportedSpanBuilder) build() exportedSpan { sst := storage.MakeBackupSSTWriter(ctx, settings, buf) for _, d := range b.keyValues { err := sst.Put(storage.MVCCKey{ - Key: []byte(d.key), + Key: s2k(d.key), Timestamp: hlc.Timestamp{WallTime: d.timestamp}, }, d.value) if err != nil { @@ -877,6 +936,7 @@ func checkFiles( store cloud.ExternalStorage, files []backuppb.BackupManifest_File, expectedFileSpans []roachpb.Spans, + elided bool, ) error { iterOpts := storage.IterOptions{ KeyTypes: storage.IterKeyTypePointsOnly, @@ -924,7 +984,7 @@ func checkFiles( key := iter.UnsafeKey() - if !endKeyInclusiveSpansContainsKey(spans, key.Key) { + if !endKeyInclusiveSpansContainsKey(spans, key.Key, elided) { return errors.Newf("key %v in file %s not contained by its spans [%v]", key.Key, f, spans) } } @@ -934,8 +994,12 @@ func checkFiles( return nil } -func endKeyInclusiveSpansContainsKey(spans roachpb.Spans, key roachpb.Key) bool { +func endKeyInclusiveSpansContainsKey(spans roachpb.Spans, key roachpb.Key, elided bool) bool { for _, sp := range spans { + if elided { + sp.Key, _ = keys.StripTablePrefix(sp.Key) + sp.EndKey, _ = keys.StripTablePrefix(sp.EndKey) + } if sp.ContainsKey(key) { return true } diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 80074332852a..4253760eab35 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -9,6 +9,7 @@ package backupccl import ( + "bytes" "context" "fmt" "runtime" @@ -416,6 +417,7 @@ func (rd *restoreDataProcessor) openSSTs( for ; idx < len(entry.Files); idx++ { file := entry.Files[idx] + log.VEventf(ctx, 2, "import file %s which starts at %s", file.Path, entry.Span.Key) alloc, err := rd.qp.TryAcquireMaybeIncreaseCapacity(ctx, sstOverheadBytesPerFile) @@ -559,6 +561,11 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( iter := sst.iter defer sst.cleanup() + elidedPrefix, err := elidedPrefix(entry.Span.Key, sst.entry.ElidedPrefix) + if err != nil { + return summary, err + } + var batcher SSTBatcherExecutor if rd.spec.ValidateOnly { batcher = &sstBatcherNoop{} @@ -617,19 +624,36 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key}, storage.MVCCKey{Key: entry.Span.EndKey} + if elidedPrefix != nil { + startKeyMVCC.Key = bytes.TrimPrefix(startKeyMVCC.Key, elidedPrefix) + } + if verbose { + log.Infof(ctx, "reading from %s to %s", startKeyMVCC, endKeyMVCC) + } for iter.SeekGE(startKeyMVCC); ; iter.NextKey() { ok, err := iter.Valid() if err != nil { return summary, err } - if !ok || !iter.UnsafeKey().Less(endKeyMVCC) { + if !ok { + if verbose { + log.Infof(ctx, "iterator exhausted") + } break } key := iter.UnsafeKey() - keyScratch = append(keyScratch[:0], key.Key...) + keyScratch = append(append(keyScratch[:0], elidedPrefix...), key.Key...) key.Key = keyScratch + + if !key.Less(endKeyMVCC) { + if verbose { + log.Infof(ctx, "iterator key %s exceeded end %s", key, endKeyMVCC) + } + break + } + v, err := iter.UnsafeValue() if err != nil { return summary, err diff --git a/pkg/ccl/backupccl/restore_online_test.go b/pkg/ccl/backupccl/restore_online_test.go index f24b6b755923..377e3247da74 100644 --- a/pkg/ccl/backupccl/restore_online_test.go +++ b/pkg/ccl/backupccl/restore_online_test.go @@ -45,6 +45,9 @@ func TestOnlineRestoreBasic(t *testing.T) { defer cleanupFn() externalStorage := "nodelocal://1/backup" + // TODO(dt): remove this when OR supports synthesis. + sqlDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.elide_common_prefix.enabled = false`) + sqlDB.Exec(t, fmt.Sprintf("BACKUP INTO '%s'", externalStorage)) params := base.TestClusterArgs{ @@ -94,6 +97,9 @@ func TestOnlineRestoreTenant(t *testing.T) { defer cleanupFn() srv := tc.Server(0) + // TODO(dt): remove this when OR supports synthesis. + systemDB.Exec(t, `SET CLUSTER SETTING bulkio.backup.elide_common_prefix.enabled = false`) + _ = securitytest.EmbeddedTenantIDs() _, conn10 := serverutils.StartTenant(t, srv, base.TestTenantArgs{TenantID: roachpb.MustMakeTenantID(10)}) diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index 3f38773b72e8..85bfc94bb1dd 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -313,7 +313,8 @@ func generateAndSendImportSpans( flush := func(ctx context.Context) error { entry := execinfrapb.RestoreSpanEntry{ - Span: lastCovSpan, + Span: lastCovSpan, + ElidedPrefix: backups[0].ElidedPrefix, } for layer := range covFilesByLayer { for _, f := range covFilesByLayer[layer] { diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index d504f1871d12..8aed50ae1d9b 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -289,6 +289,12 @@ message StreamIngestionFrontierSpec { optional StreamIngestionPartitionSpecs partition_specs = 9 [(gogoproto.nullable) = false]; } +enum ElidePrefix { + None = 0; + Tenant = 1; + TenantAndTable = 2; +} + message BackupDataSpec { // TODO(lidor): job_id is not needed when interoperability with 22.2 is // dropped, the new way to send the job tag is using 'job_tag' in the @@ -311,7 +317,8 @@ message BackupDataSpec { // when using FileTable ExternalStorage. optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; - // NEXTID: 12. + optional ElidePrefix elide_prefix = 12 [(gogoproto.nullable) = false]; + // NEXTID: 13. } message RestoreFileSpec { @@ -323,7 +330,6 @@ message RestoreFileSpec { optional roachpb.RowCount backup_file_entry_counts = 6 [(gogoproto.nullable) = false]; optional uint64 backing_file_size = 7 [(gogoproto.nullable) = false]; optional uint64 approximate_physical_size = 8 [(gogoproto.nullable) = false]; - // NEXT ID: 9. } @@ -349,6 +355,7 @@ message RestoreSpanEntry { optional roachpb.Span span = 1 [(gogoproto.nullable) = false]; repeated RestoreFileSpec files = 2 [(gogoproto.nullable) = false]; optional int64 progressIdx = 3 [(gogoproto.nullable) = false]; + optional ElidePrefix elided_prefix = 4 [(gogoproto.nullable) = false]; } message RestoreDataSpec {