Skip to content

Commit

Permalink
backupccl: add test with randomized completed spans to TestRestoreEnt…
Browse files Browse the repository at this point in the history
…ryCover

Add some testing with randomized completed spans to
TestRestoreEntryCover. This testing should demonstrate the correctness
of generateAndSendImportSpans in the presence of completed spans.

Informs: cockroachdb#98779

Release note: None
  • Loading branch information
Rui Hu committed Mar 29, 2023
1 parent d8ce1ce commit c546741
Showing 1 changed file with 104 additions and 3 deletions.
107 changes: 104 additions & 3 deletions pkg/ccl/backupccl/restore_span_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,13 @@ func makeImportSpans(
spans []roachpb.Span,
backups []backuppb.BackupManifest,
layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory,
highWaterMark []byte,
targetSize int64,
introducedSpanFrontier *spanUtils.Frontier,
completedSpans []jobspb.RestoreProgress_FrontierEntry,
useSimpleImportSpans bool,
) ([]execinfrapb.RestoreSpanEntry, error) {
var cover []execinfrapb.RestoreSpanEntry
cover := make([]execinfrapb.RestoreSpanEntry, 0)
spanCh := make(chan execinfrapb.RestoreSpanEntry)
g := ctxgroup.WithContext(context.Background())
g.Go(func() error {
Expand All @@ -270,10 +271,10 @@ func makeImportSpans(

filter, err := makeSpanCoveringFilter(
checkpointFrontier,
nil,
highWaterMark,
introducedSpanFrontier,
targetSize,
true)
highWaterMark == nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -398,6 +399,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -417,6 +419,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -437,6 +440,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
2<<20,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -456,6 +460,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
2<<20,
emptySpanFrontier,
emptyCompletedSpans,
Expand All @@ -477,6 +482,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
introducedSpanFrontier,
emptyCompletedSpans,
Expand All @@ -495,6 +501,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
introducedSpanFrontier,
emptyCompletedSpans,
Expand Down Expand Up @@ -522,6 +529,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
persistFrontier(frontier, 0),
Expand All @@ -539,6 +547,7 @@ func TestRestoreEntryCoverExample(t *testing.T) {
spans,
backups,
layerToIterFactory,
nil,
noSpanTargetSize,
emptySpanFrontier,
persistFrontier(frontier, 0),
Expand Down Expand Up @@ -926,6 +935,7 @@ func TestRestoreEntryCoverReIntroducedSpans(t *testing.T) {
restoreSpans,
backups,
layerToIterFactory,
nil,
0,
introducedSpanFrontier,
[]jobspb.RestoreProgress_FrontierEntry{},
Expand Down Expand Up @@ -986,6 +996,29 @@ func TestRestoreEntryCover(t *testing.T) {
defer cleanupFn()
execCfg := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig)

// getRandomCompletedSpans randomly gets up to maxNumSpans completed
// spans from the cover. A completed span can cover 1 or more
// RestoreSpanEntry in the cover.
getRandomCompletedSpans := func(cover []execinfrapb.RestoreSpanEntry, maxNumSpans int) []roachpb.Span {
var completedSpans []roachpb.Span
for i := 0; i < maxNumSpans; i++ {
start := rand.Intn(len(cover) + 1)
length := rand.Intn(len(cover) + 1 - start)
if length == 0 {
continue
}

sp := roachpb.Span{
Key: cover[start].Span.Key,
EndKey: cover[start+length-1].Span.EndKey,
}
completedSpans = append(completedSpans, sp)
}

merged, _ := roachpb.MergeSpans(&completedSpans)
return merged
}

for _, numBackups := range []int{1, 2, 3, 5, 9, 10, 11, 12} {
for _, spans := range []int{1, 2, 3, 5, 9, 11, 12} {
for _, files := range []int{0, 1, 2, 3, 4, 10, 12, 50} {
Expand All @@ -1009,13 +1042,81 @@ func TestRestoreEntryCover(t *testing.T) {
backups[numBackups-1].Spans,
backups,
layerToIterFactory,
nil,
target<<20,
introducedSpanFrontier,
[]jobspb.RestoreProgress_FrontierEntry{},
simpleImportSpans)
require.NoError(t, err)
require.NoError(t, checkRestoreCovering(ctx, backups, backups[numBackups-1].Spans,
cover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage))

// Check that the correct import spans are created if the job is
// resumed after the completion of some random entries in the cover.
if len(cover) > 0 {
for n := 1; n <= 5; n++ {
var completedSpans []roachpb.Span
var highWater []byte
var frontierEntries []jobspb.RestoreProgress_FrontierEntry

// Randomly choose to use frontier checkpointing instead of
// explicitly testing both forms to avoid creating an exponential
// number of tests.
useFrontierCheckpointing := rand.Intn(2) == 0
if useFrontierCheckpointing {
completedSpans = getRandomCompletedSpans(cover, n)
for _, sp := range completedSpans {
frontierEntries = append(frontierEntries, jobspb.RestoreProgress_FrontierEntry{
Span: sp,
Timestamp: completedSpanTime,
})
}
} else {
idx := r.Intn(len(cover))
completedSpans = append(completedSpans, roachpb.Span{
Key: cover[0].Span.Key,
EndKey: cover[idx].Span.EndKey,
})
highWater = cover[idx].Span.EndKey
}

resumeCover, err := makeImportSpans(
ctx,
backups[numBackups-1].Spans,
backups,
layerToIterFactory,
highWater,
target<<20,
introducedSpanFrontier,
frontierEntries,
simpleImportSpans)
require.NoError(t, err)

// Compute the spans that are required on resume by subtracting
// completed spans from the original required spans.
var resumedRequiredSpans roachpb.Spans
for _, origReq := range backups[numBackups-1].Spans {
var resumeReq []roachpb.Span
if useFrontierCheckpointing {
resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, completedSpans)
} else {
resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, []roachpb.Span{{Key: cover[0].Span.Key, EndKey: highWater}})
}
resumedRequiredSpans = append(resumedRequiredSpans, resumeReq...)
}

var errorMsg string
if useFrontierCheckpointing {
errorMsg = fmt.Sprintf("completed spans in frontier: %v", completedSpans)
} else {
errorMsg = fmt.Sprintf("highwater: %v", highWater)
}

require.NoError(t, checkRestoreCovering(ctx, backups, resumedRequiredSpans,
resumeCover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage),
errorMsg)
}
}
})
}
}
Expand Down

0 comments on commit c546741

Please sign in to comment.