Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: drop temp system database on failed restore #70380

Merged
merged 1 commit into from
Sep 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2187,6 +2187,46 @@ func TestRestoreFailDatabaseCleanup(t *testing.T) {
)
}

func TestRestoreFailCleansUpTempSystemDatabase(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

_, _, sqlDB, dir, cleanup := BackupRestoreTestSetup(t, singleNode, 0, InitManualReplication)
defer cleanup()

// Create a database with a type and table.
sqlDB.Exec(t, `
CREATE DATABASE d;
CREATE TYPE d.ty AS ENUM ('hello');
CREATE TABLE d.tb (x d.ty);
INSERT INTO d.tb VALUES ('hello'), ('hello');
`)

// Cluster BACKUP.
sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo)

// Bugger the backup by removing the SST files.
if err := filepath.Walk(dir+"/foo", func(path string, info os.FileInfo, err error) error {
if err != nil {
t.Fatal(err)
}
if info.Name() == backupManifestName || !strings.HasSuffix(path, ".sst") {
return nil
}
return os.Remove(path)
}); err != nil {
t.Fatal(err)
}

_, _, sqlDBRestore, cleanupRestore := backupRestoreTestSetupEmpty(t, singleNode, dir, InitManualReplication,
base.TestClusterArgs{})
defer cleanupRestore()
// We should get an error when restoring the table.
sqlDBRestore.ExpectErr(t, "sst: no such file", `RESTORE FROM $1`, LocalFoo)
row := sqlDBRestore.QueryStr(t, fmt.Sprintf(`SELECT * FROM [SHOW DATABASES] WHERE database_name = '%s'`, restoreTempSystemDB))
require.Equal(t, 0, len(row))
}

func TestBackupRestoreUserDefinedSchemas(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
32 changes: 26 additions & 6 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) error {
details := r.job.Details().(jobspb.RestoreDetails)
p := execCtx.(sql.JobExecContext)
r.execCfg = p.ExecCfg()

backupManifests, latestBackupManifest, sqlDescs, err := loadBackupSQLDescs(
ctx, p, details, details.Encryption,
Expand Down Expand Up @@ -1565,7 +1566,6 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return err
}
}
r.execCfg = p.ExecCfg()
var remappedStats []*stats.TableStatisticProto
backupStats, err := getStatisticsFromBackup(ctx, defaultStore, details.Encryption,
latestBackupManifest)
Expand Down Expand Up @@ -1717,7 +1717,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
// Reload the details as we may have updated the job.
details = r.job.Details().(jobspb.RestoreDetails)

if err := r.cleanupTempSystemTables(ctx); err != nil {
if err := r.cleanupTempSystemTables(ctx, nil /* txn */); err != nil {
return err
}
}
Expand Down Expand Up @@ -2085,7 +2085,17 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}
return err
}
}
return r.dropDescriptors(ctx, execCfg.JobRegistry, execCfg.Codec, txn, descsCol)

if err := r.dropDescriptors(ctx, execCfg.JobRegistry, execCfg.Codec, txn, descsCol); err != nil {
return err
}

if details.DescriptorCoverage == tree.AllDescriptors {
// The temporary system table descriptors should already have been dropped
// in `dropDescriptors` but we still need to drop the temporary system db.
return r.cleanupTempSystemTables(ctx, txn)
}
return nil
}); err != nil {
return err
}
Expand Down Expand Up @@ -2535,16 +2545,26 @@ func (r *restoreResumer) restoreSystemTables(
return nil
}

func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context) error {
func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Txn) error {
executor := r.execCfg.InternalExecutor
// Check if the temp system database has already been dropped. This can happen
// if the restore job fails after the system database has cleaned up.
checkIfDatabaseExists := "SELECT database_name FROM [SHOW DATABASES] WHERE database_name=$1"
if row, err := executor.QueryRow(ctx, "checking-for-temp-system-db" /* opName */, txn, checkIfDatabaseExists, restoreTempSystemDB); err != nil {
return errors.Wrap(err, "checking for temporary system db")
} else if row == nil {
// Temporary system DB might already have been dropped by the restore job.
return nil
}

// After restoring the system tables, drop the temporary database holding the
// system tables.
gcTTLQuery := fmt.Sprintf("ALTER DATABASE %s CONFIGURE ZONE USING gc.ttlseconds=1", restoreTempSystemDB)
if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, nil /* txn */, gcTTLQuery); err != nil {
if _, err := executor.Exec(ctx, "altering-gc-ttl-temp-system" /* opName */, txn, gcTTLQuery); err != nil {
log.Errorf(ctx, "failed to update the GC TTL of %q: %+v", restoreTempSystemDB, err)
}
dropTableQuery := fmt.Sprintf("DROP DATABASE %s CASCADE", restoreTempSystemDB)
if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, nil /* txn */, dropTableQuery); err != nil {
if _, err := executor.Exec(ctx, "drop-temp-system-db" /* opName */, txn, dropTableQuery); err != nil {
return errors.Wrap(err, "dropping temporary system db")
}
return nil
Expand Down