From c5467415a67d290539e08be2c04b80aa719987ae Mon Sep 17 00:00:00 2001 From: Rui Hu Date: Wed, 29 Mar 2023 19:59:41 +0000 Subject: [PATCH] backupccl: add test with randomized completed spans to TestRestoreEntryCover Add some testing with randomized completed spans to TestRestoreEntryCover. This testing should demonstrate the correctness of generateAndSendImportSpans in the presence of completed spans. Informs: #98779 Release note: None --- .../backupccl/restore_span_covering_test.go | 107 +++++++++++++++++- 1 file changed, 104 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index 35f9e67b175a..360b6e8a2cc3 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -248,12 +248,13 @@ func makeImportSpans( spans []roachpb.Span, backups []backuppb.BackupManifest, layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, + highWaterMark []byte, targetSize int64, introducedSpanFrontier *spanUtils.Frontier, completedSpans []jobspb.RestoreProgress_FrontierEntry, useSimpleImportSpans bool, ) ([]execinfrapb.RestoreSpanEntry, error) { - var cover []execinfrapb.RestoreSpanEntry + cover := make([]execinfrapb.RestoreSpanEntry, 0) spanCh := make(chan execinfrapb.RestoreSpanEntry) g := ctxgroup.WithContext(context.Background()) g.Go(func() error { @@ -270,10 +271,10 @@ func makeImportSpans( filter, err := makeSpanCoveringFilter( checkpointFrontier, - nil, + highWaterMark, introducedSpanFrontier, targetSize, - true) + highWaterMark == nil) if err != nil { return nil, err } @@ -398,6 +399,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans, @@ -417,6 +419,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans, @@ -437,6 +440,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, 2<<20, emptySpanFrontier, emptyCompletedSpans, @@ -456,6 +460,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, 2<<20, emptySpanFrontier, emptyCompletedSpans, @@ -477,6 +482,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, noSpanTargetSize, introducedSpanFrontier, emptyCompletedSpans, @@ -495,6 +501,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, noSpanTargetSize, introducedSpanFrontier, emptyCompletedSpans, @@ -522,6 +529,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, noSpanTargetSize, emptySpanFrontier, persistFrontier(frontier, 0), @@ -539,6 +547,7 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, + nil, noSpanTargetSize, emptySpanFrontier, persistFrontier(frontier, 0), @@ -926,6 +935,7 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) { restoreSpans, backups, layerToIterFactory, + nil, 0, introducedSpanFrontier, []jobspb.RestoreProgress_FrontierEntry{}, @@ -986,6 +996,29 @@ func TestRestoreEntryCover(t *testing.T) { defer cleanupFn() execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig) + // getRandomCompletedSpans randomly gets up to maxNumSpans completed + // spans from the cover. A completed span can cover 1 or more + // RestoreSpanEntry in the cover. + getRandomCompletedSpans := func(cover []execinfrapb.RestoreSpanEntry, maxNumSpans int) []roachpb.Span { + var completedSpans []roachpb.Span + for i := 0; i < maxNumSpans; i++ { + start := rand.Intn(len(cover) + 1) + length := rand.Intn(len(cover) + 1 - start) + if length == 0 { + continue + } + + sp := roachpb.Span{ + Key: cover[start].Span.Key, + EndKey: cover[start+length-1].Span.EndKey, + } + completedSpans = append(completedSpans, sp) + } + + merged, _ := roachpb.MergeSpans(&completedSpans) + return merged + } + for _, numBackups := range []int{1, 2, 3, 5, 9, 10, 11, 12} { for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} { for _, files := range []int{0, 1, 2, 3, 4, 10, 12, 50} { @@ -1009,6 +1042,7 @@ func TestRestoreEntryCover(t *testing.T) { backups[numBackups-1].Spans, backups, layerToIterFactory, + nil, target<<20, introducedSpanFrontier, []jobspb.RestoreProgress_FrontierEntry{}, @@ -1016,6 +1050,73 @@ func TestRestoreEntryCover(t *testing.T) { require.NoError(t, err) require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans, cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage)) + + // Check that the correct import spans are created if the job is + // resumed after the completion of some random entries in the cover. + if len(cover) > 0 { + for n := 1; n <= 5; n++ { + var completedSpans []roachpb.Span + var highWater []byte + var frontierEntries []jobspb.RestoreProgress_FrontierEntry + + // Randomly choose to use frontier checkpointing instead of + // explicitly testing both forms to avoid creating an exponential + // number of tests. + useFrontierCheckpointing := rand.Intn(2) == 0 + if useFrontierCheckpointing { + completedSpans = getRandomCompletedSpans(cover, n) + for _, sp := range completedSpans { + frontierEntries = append(frontierEntries, jobspb.RestoreProgress_FrontierEntry{ + Span: sp, + Timestamp: completedSpanTime, + }) + } + } else { + idx := r.Intn(len(cover)) + completedSpans = append(completedSpans, roachpb.Span{ + Key: cover[0].Span.Key, + EndKey: cover[idx].Span.EndKey, + }) + highWater = cover[idx].Span.EndKey + } + + resumeCover, err := makeImportSpans( + ctx, + backups[numBackups-1].Spans, + backups, + layerToIterFactory, + highWater, + target<<20, + introducedSpanFrontier, + frontierEntries, + simpleImportSpans) + require.NoError(t, err) + + // Compute the spans that are required on resume by subtracting + // completed spans from the original required spans. + var resumedRequiredSpans roachpb.Spans + for _, origReq := range backups[numBackups-1].Spans { + var resumeReq []roachpb.Span + if useFrontierCheckpointing { + resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, completedSpans) + } else { + resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, []roachpb.Span{{Key: cover[0].Span.Key, EndKey: highWater}}) + } + resumedRequiredSpans = append(resumedRequiredSpans, resumeReq...) + } + + var errorMsg string + if useFrontierCheckpointing { + errorMsg = fmt.Sprintf("completed spans in frontier: %v", completedSpans) + } else { + errorMsg = fmt.Sprintf("highwater: %v", highWater) + } + + require.NoError(t, checkRestoreCovering(ctx, backups, resumedRequiredSpans, + resumeCover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage), + errorMsg) + } + } }) } }