Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sctest: Augmented BACKUP/RESTORE tests with table-level restore #87316

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 110 additions & 47 deletions pkg/sql/schemachanger/sctest/cumulative.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,15 +360,20 @@ var runAllBackups = flag.Bool(
// cluster constructor needs to provide a cluster with CCL BACKUP/RESTORE
// functionality enabled.
func Backup(t *testing.T, path string, newCluster NewClusterFunc) {
var after [][]string
var after [][]string // CREATE_STATEMENT for all descriptors after finishing `stmts` in each test case.
var dbName string
r, _ := randutil.NewTestRand()
const runRate = .5

maybeRandomlySkip := func(t *testing.T) {
if !*runAllBackups && r.Float64() >= runRate {
skip.IgnoreLint(t, "skipping due to randomness")
}
}

// A function that executes `setup` first and then count the number of
// postCommit and postCommitNonRevertible stages for executing `stmts`.
// It also initializes `after` and `dbName` here.
countStages := func(
t *testing.T, setup, stmts []parser.Statement,
) (postCommit, nonRevertible int) {
Expand All @@ -389,39 +394,22 @@ func Backup(t *testing.T, path string, newCluster NewClusterFunc) {
})
return postCommit, nonRevertible
}
var testBackupRestoreCase func(
t *testing.T, setup, stmts []parser.Statement, ord int,
)
testFunc := func(t *testing.T, _ string, _ bool, setup, stmts []parser.Statement) {
postCommit, nonRevertible := countStages(t, setup, stmts)
n := postCommit + nonRevertible
t.Logf(
"test case has %d revertible post-commit stages and %d non-revertible"+
" post-commit stages", postCommit, nonRevertible,
)
for i := 0; i <= n; i++ {
if !t.Run(
fmt.Sprintf("backup/restore stage %d of %d", i, n),
func(t *testing.T) {
maybeRandomlySkip(t)
testBackupRestoreCase(t, setup, stmts, i)
},
) {
return
}
}
}
type stage struct {
p scplan.Plan
stageIdx int
resume chan error
}
mkStage := func(p scplan.Plan, stageIdx int) stage {
return stage{p: p, stageIdx: stageIdx, resume: make(chan error)}
}
testBackupRestoreCase = func(

// A function that takes backup at `ord`-th stage while executing `stmts` after
// finishing `setup`. It also takes `ord` backups at each of the preceding stage
// if it's a revertible stage.
// It then restores the backup(s) in various "flavors" (see
// comment below for details) and expect the restore to finish the schema change job
// as if the backup/restore had never happened.
testBackupRestoreCase := func(
t *testing.T, setup, stmts []parser.Statement, ord int,
) {
type stage struct {
p scplan.Plan
stageIdx int
resume chan error
}

stageChan := make(chan stage)
ctx, cancel := context.WithCancel(context.Background())
db, cleanup := newCluster(t, &scexec.TestingKnobs{
Expand All @@ -430,7 +418,7 @@ func Backup(t *testing.T, path string, newCluster NewClusterFunc) {
return nil
}
if stageChan != nil {
s := mkStage(p, stageIdx)
s := stage{p: p, stageIdx: stageIdx, resume: make(chan error)}
select {
case stageChan <- s:
case <-ctx.Done():
Expand Down Expand Up @@ -561,21 +549,75 @@ func Backup(t *testing.T, path string, newCluster NewClusterFunc) {
t.Logf("finished")

for i, b := range backups {
for _, isSchemaOnly := range []bool{true, false} {
name := ""
if isSchemaOnly {
name = "schema-only"
}
t.Run(name, func(t *testing.T) {
// For each backup, we restore it in three flavors.
// 1. RESTORE DATABASE
// 2. RESTORE DATABASE WITH schema_only
// 3. RESTORE TABLE tbl1, tbl2, ..., tblN
// We then assert that the restored database should correctly finish
// the ongoing schema change job when the backup was taken, and
// reaches the expected state as if the back/restore had not happened at all.
// Skip a backup randomly.
type backupConsumptionFlavor struct {
name string
restoreSetup []string
restoreQuery string
}
flavors := []backupConsumptionFlavor{
{
name: "restore database",
restoreSetup: []string{
fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName),
"SET use_declarative_schema_changer = 'off'",
},
restoreQuery: fmt.Sprintf("RESTORE DATABASE %s FROM LATEST IN '%s'", dbName, b.url),
},
{
name: "restore database with schema-only",
restoreSetup: []string{
fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName),
"SET use_declarative_schema_changer = 'off'",
},
restoreQuery: fmt.Sprintf("RESTORE DATABASE %s FROM LATEST IN '%s' with schema_only", dbName, b.url),
},
}

// For the third flavor, we restore all tables in the backup.
// Skip it if there is no tables.
rows := tdb.QueryStr(t, `
SELECT parent_schema_name, object_name
FROM [SHOW BACKUP FROM LATEST IN $1]
WHERE database_name = $2 AND object_type = 'table'`, b.url, dbName)
var tablesToRestore []string
for _, row := range rows {
tablesToRestore = append(tablesToRestore, fmt.Sprintf("%s.%s.%s", dbName, row[0], row[1]))
}

if len(tablesToRestore) > 0 {
flavors = append(flavors, backupConsumptionFlavor{
name: "restore all tables in database",
restoreSetup: []string{
fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName),
fmt.Sprintf("CREATE DATABASE %q", dbName),
"SET use_declarative_schema_changer = 'off'",
},
restoreQuery: fmt.Sprintf("RESTORE TABLE %s FROM LATEST IN '%s' WITH skip_missing_sequences",
strings.Join(tablesToRestore, ","), b.url),
})
}

// TODO (xiang): Add here the fourth flavor that restores
// only a subset, maybe randomly chosen, of all tables with
// `RESTORE TABLE`. Currently, it's blocked by issue #87518.
// We will need to change what the expected output will be
// in this case, since it will no longer be simply `before`
// and `after`.

for _, flavor := range flavors {
t.Run(flavor.name, func(t *testing.T) {
maybeRandomlySkip(t)
t.Logf("testing backup %d %v", i, b.isRollback)
tdb.Exec(t, fmt.Sprintf("DROP DATABASE IF EXISTS %q CASCADE", dbName))
tdb.Exec(t, "SET use_declarative_schema_changer = 'off'")
restoreQuery := fmt.Sprintf("RESTORE DATABASE %s FROM LATEST IN '%s'", dbName, b.url)
if isSchemaOnly {
restoreQuery = restoreQuery + " with schema_only"
}
tdb.Exec(t, restoreQuery)
t.Logf("testing backup %d (rollback=%v)", i, b.isRollback)
tdb.ExecMultiple(t, flavor.restoreSetup...)
tdb.Exec(t, flavor.restoreQuery)
tdb.Exec(t, fmt.Sprintf("USE %q", dbName))
waitForSchemaChangesToFinish(t, tdb)
afterRestore := tdb.QueryStr(t, fetchDescriptorStateQuery)
Expand All @@ -596,6 +638,27 @@ SELECT * FROM crdb_internal.invalid_objects WHERE database_name != 'backups'
}
}
}

testFunc := func(t *testing.T, _ string, _ bool, setup, stmts []parser.Statement) {
postCommit, nonRevertible := countStages(t, setup, stmts)
n := postCommit + nonRevertible
t.Logf(
"test case has %d revertible post-commit stages and %d non-revertible"+
" post-commit stages", postCommit, nonRevertible,
)
for i := 0; i <= n; i++ {
if !t.Run(
fmt.Sprintf("backup/restore stage %d of %d", i, n),
func(t *testing.T) {
maybeRandomlySkip(t)
testBackupRestoreCase(t, setup, stmts, i)
},
) {
return
}
}
}

cumulativeTest(t, path, testFunc)
}

Expand Down