Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ccl/backupccl: add new split and scatter processor that generates import spans #94805

Merged
merged 1 commit into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"backup_telemetry.go",
"create_scheduled_backup.go",
"file_sst_sink.go",
"generative_split_and_scatter_processor.go",
"key_rewriter.go",
"restoration_data.go",
"restore_data_processor.go",
Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/backupccl/backup_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,18 +187,22 @@ func checkFiles(
ctx context.Context, t *testing.T, m *backuppb.BackupManifest, bm *backupinfo.BackupMetadata,
) {
var metaFiles []backuppb.BackupManifest_File
var file backuppb.BackupManifest_File
it, err := bm.NewFileIter(ctx)
if err != nil {
t.Fatal(err)
}
defer it.Close()

for it.Next(&file) {
metaFiles = append(metaFiles, file)
}
if it.Err() != nil {
t.Fatal(it.Err())
for ; ; it.Next() {
ok, err := it.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}

metaFiles = append(metaFiles, *it.Value())
}

require.Equal(t, m.Files, metaFiles)
Expand Down
77 changes: 48 additions & 29 deletions pkg/ccl/backupccl/backupinfo/backup_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ func writeDescsToMetadata(
return nil
}

func FileCmp(left backuppb.BackupManifest_File, right backuppb.BackupManifest_File) int {
if cmp := left.Span.Key.Compare(right.Span.Key); cmp != 0 {
return cmp
}

dt marked this conversation as resolved.
Show resolved Hide resolved
return strings.Compare(left.Path, right.Path)
}

func writeFilesSST(
ctx context.Context,
m *backuppb.BackupManifest,
Expand All @@ -280,8 +288,7 @@ func writeFilesSST(

// Sort and write all of the files into a single file info SST.
sort.Slice(m.Files, func(i, j int) bool {
cmp := m.Files[i].Span.Key.Compare(m.Files[j].Span.Key)
return cmp < 0 || (cmp == 0 && strings.Compare(m.Files[i].Path, m.Files[j].Path) < 0)
return FileCmp(m.Files[i], m.Files[j]) < 0
})

for i := range m.Files {
Expand Down Expand Up @@ -1015,10 +1022,11 @@ func (si *SpanIterator) Next(span *roachpb.Span) bool {
return false
}

// FileIterator is a simple iterator to iterate over stats.TableStatisticProtos.
// FileIterator is a simple iterator to iterate over backuppb.BackupManifest_File.
type FileIterator struct {
mergedIterator storage.SimpleMVCCIterator
err error
file *backuppb.BackupManifest_File
}

// NewFileIter creates a new FileIterator for the backup metadata.
Expand Down Expand Up @@ -1076,40 +1084,51 @@ func (fi *FileIterator) Close() {
fi.mergedIterator.Close()
}

// Err returns the iterator's error.
func (fi *FileIterator) Err() error {
return fi.err
}

// Next retrieves the next file in the iterator.
//
// Next returns true if next element was successfully unmarshalled into file,
// and false if there are no more elements or if an error was encountered. When
// Next returns false, the user should call the Err method to verify the
// existence of an error.
func (fi *FileIterator) Next(file *backuppb.BackupManifest_File) bool {
// Valid indicates whether or not the iterator is pointing to a valid value.
func (fi *FileIterator) Valid() (bool, error) {
if fi.err != nil {
return false
return false, fi.err
}

valid, err := fi.mergedIterator.Valid()
if err != nil || !valid {
fi.err = err
return false
}
v, err := fi.mergedIterator.UnsafeValue()
if err != nil {
if ok, err := fi.mergedIterator.Valid(); !ok {
fi.err = err
return false
return ok, err
}
err = protoutil.Unmarshal(v, file)
if err != nil {
fi.err = err
return false

if fi.file == nil {
v, err := fi.mergedIterator.UnsafeValue()
if err != nil {
fi.err = err
return false, fi.err
}

file := &backuppb.BackupManifest_File{}
err = protoutil.Unmarshal(v, file)
if err != nil {
fi.err = err
return false, fi.err
}
fi.file = file
}
return true, nil
}

// Value returns the current value of the iterator, if valid.
func (fi *FileIterator) Value() *backuppb.BackupManifest_File {
return fi.file
}

// Next advances the iterator the the next value.
func (fi *FileIterator) Next() {
fi.mergedIterator.Next()
return true
fi.file = nil
}

// Reset resets the iterator to the first value.
func (fi *FileIterator) Reset() {
fi.mergedIterator.SeekGE(storage.MVCCKey{})
fi.err = nil
fi.file = nil
}

// DescIterator is a simple iterator to iterate over descpb.Descriptors.
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/backupccl/backuprand/backup_rand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import (
// randomly generated tables and verifies their data and schema are preserved.
// It tests that full database backup as well as all subsets of per-table backup
// roundtrip properly. 50% of the time, the test runs the restore with the
// schema_only parameter, which does not restore any rows from user tables.
// schema_only parameter, which does not restore any rows from user tables. The
// test will also run with bulkio.restore.use_simple_import_spans set to true
// 50% of the time.
func TestBackupRestoreRandomDataRoundtrips(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -72,6 +74,10 @@ func TestBackupRestoreRandomDataRoundtrips(t *testing.T) {
runSchemaOnlyExtension = ", schema_only"
}

if rng.Intn(2) == 0 {
sqlDB.Exec(t, "SET CLUSTER SETTING bulkio.restore.use_simple_import_spans = true")
}

tables := sqlDB.Query(t, `SELECT name FROM crdb_internal.tables WHERE
database_name = 'rand' AND schema_name = 'public'`)
var tableNames []string
Expand Down
24 changes: 20 additions & 4 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
Expand All @@ -27,6 +29,7 @@ func BenchmarkCoverageChecks(b *testing.B) {
r, _ := randutil.NewTestRand()

for _, numBackups := range []int{1, 7, 24, 24 * 4} {
numBackups := numBackups
b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) {
for _, numSpans := range []int{10, 20, 100} {
b.Run(fmt.Sprintf("numSpans=%d", numSpans), func(b *testing.B) {
Expand Down Expand Up @@ -61,6 +64,7 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
ctx := context.Background()
r, _ := randutil.NewTestRand()
for _, numBackups := range []int{1, 2, 24, 24 * 4} {
numBackups := numBackups
b.Run(fmt.Sprintf("numBackups=%d", numBackups), func(b *testing.B) {
for _, baseFiles := range []int{0, 100, 10000} {
b.Run(fmt.Sprintf("numFiles=%d", baseFiles), func(b *testing.B) {
Expand All @@ -82,10 +86,22 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
layerToBackupManifestFileIterFactory, err := getBackupManifestFileIters(ctx, &execCfg,
backups, nil, nil)
require.NoError(b, err)
cov, err := makeSimpleImportSpans(backups[numBackups-1].Spans, backups,
layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier,
nil, 0)
require.NoError(b, err)

spanCh := make(chan execinfrapb.RestoreSpanEntry, 1000)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
defer close(spanCh)
return generateAndSendImportSpans(ctx, backups[numBackups-1].Spans, backups,
layerToBackupManifestFileIterFactory, nil, introducedSpanFrontier, nil, 0, spanCh, false)
})

var cov []execinfrapb.RestoreSpanEntry
for entry := range spanCh {
cov = append(cov, entry)
}

require.NoError(b, g.Wait())
b.ReportMetric(float64(len(cov)), "coverSize")
}
})
Expand Down
Loading