Skip to content

Commit

Permalink
Merge pull request #63765 from pbardea/manual-backport-jobs
Browse files Browse the repository at this point in the history
release-20.2: backupccl: cluster restore import and restore jobs as canceled
  • Loading branch information
pbardea authored Apr 19, 2021
2 parents 0f7c01f + b3151e7 commit dfe5778
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 13 deletions.
54 changes: 41 additions & 13 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,7 @@ func TestReintroduceOfflineSpans(t *testing.T) {

const numAccounts = 1000
ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitNone, params)
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{})
defer cleanupSrc()
defer cleanupDst()

dbBackupLoc := "nodelocal://0/my_db_backup"
clusterBackupLoc := "nodelocal://0/my_cluster_backup"
Expand All @@ -819,7 +817,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
<-dbRestoreStarted
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// All the restore to finish. This will issue AddSSTable requests at a
var tsMidRestore string
srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsMidRestore)

// Allow the restore to finish. This will issue AddSSTable requests at a
// timestamp that is before the last incremental we just took.
close(blockDBRestore)

Expand All @@ -836,16 +837,43 @@ func TestReintroduceOfflineSpans(t *testing.T) {

srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)
t.Run("spans-reintroduced", func(t *testing.T) {
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{})
defer cleanupDst()

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)

// Assert that the restored database has the same number of rows in both the
// source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
})

t.Run("restore-canceled", func(t *testing.T) {
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{})
defer cleanupDst()

// Assert that the restored database has the same number
// of rows in both the source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
// Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE
// job to revert.
destDB.CheckQueryResultsRetry(t, `
SELECT description, status FROM [SHOW JOBS]
WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`,
[][]string{},
)
// The cluster restore should succeed, but the table restore should have failed.
destDB.CheckQueryResults(t,
`SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`,
[][]string{{"canceled", "1"}, {"succeeded", "1"}})

destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`)
})
}
116 changes: 116 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"fmt"
"math"
"strings"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
Expand Down Expand Up @@ -528,6 +529,10 @@ func restore(
if len(spans) == 0 {
return emptyRowCount, nil
}
details := job.Details().(jobspb.RestoreDetails)
if alreadyMigrated := checkForMigratedData(details); alreadyMigrated {
return emptyRowCount, nil
}

mu := struct {
syncutil.Mutex
Expand Down Expand Up @@ -1922,6 +1927,81 @@ func getRestoringPrivileges(
return updatedPrivileges, nil
}

func getImportAndRestoreJobs(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, stagingTableName string,
) ([]int64, error) {
pageSize := 100
allJobs := make([]int64, 0)
var maxID int64
for {
var done bool
var err error
var pageJobIDs []int64
done, maxID, pageJobIDs, err = getImportAndRestoreJobsPage(ctx, ie, stagingTableName, txn, maxID, pageSize)
if err != nil {
return nil, err
}
allJobs = append(allJobs, pageJobIDs...)
if done {
break
}
}
return allJobs, nil
}

func getImportAndRestoreJobsPage(
ctx context.Context,
ie *sql.InternalExecutor,
stagingTableName string,
txn *kv.Txn,
minID int64,
pageSize int,
) (done bool, maxID int64, jobIDs []int64, _ error) {
stmt := fmt.Sprintf("SELECT id, payload FROM %s "+
"WHERE id > $1 AND status IN ($2, $3, $4)"+
"ORDER BY id "+ // the ordering is important as we keep track of the maximum ID we've seen
"LIMIT $5", stagingTableName)
rows, err := ie.Query(ctx, "fetch-import-and-restore-jobs", nil /* txn */, stmt, minID, jobs.StatusRunning, jobs.StatusPaused, jobs.StatusPauseRequested, pageSize)
if err != nil {
return false, 0, nil, errors.Wrapf(err, "failed to fetch import and restore jobs from %s", stagingTableName)
}

if len(rows) == 0 {
return true, 0, nil, nil
}
// Track the highest ID we encounter, so it can serve as the bottom of the
// next page.
maxID = int64(*(rows[len(rows)-1][0].(*tree.DInt)))
// If we got as many rows as we asked for, there might be more.
morePages := len(rows) == pageSize

for _, row := range rows {
id, payloadBytes := row[0], row[1]
rawJobID, ok := id.(*tree.DInt)
if !ok {
return false, 0, nil, errors.New("could not parse jobID")
}
jobID := int64(*rawJobID)
payload, err := jobs.UnmarshalPayload(payloadBytes)
if err != nil {
return false, 0, nil, err
}
if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore {
jobIDs = append(jobIDs, jobID)
}
}

return !morePages, maxID, jobIDs, nil
}

// checkForMigratedData checks to see if any of the system tables have already
// been restored. If they have, then we have already restored all of the data
// to the cluster. We should not restore the data again, since we would be
// shadowing potentially migrated system table data.
func checkForMigratedData(details jobspb.RestoreDetails) bool {
return len(details.SystemTablesRestored) > 0
}

// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(
Expand Down Expand Up @@ -1955,6 +2035,25 @@ func (r *restoreResumer) restoreSystemTables(
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetDebugName("system-restore-txn")
stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTableName)
// Perform any migrations of the data that are required.
switch systemTableName {
case systemschema.JobsTable.Name:
stagingTableName := restoreTempSystemDB + "." + systemTableName
jobsToRevert, err := getImportAndRestoreJobs(ctx, executor, txn, stagingTableName)
if err != nil {
return errors.Wrap(err, "failed to fetch IMPORT and RESTORE jobs")
}

var updateStatusQuery strings.Builder
fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", stagingTableName)
writeJobsList(jobsToRevert, &updateStatusQuery)

_, err = executor.Exec(ctx, stmtDebugName+"-status-update", txn, updateStatusQuery.String(), jobs.StatusCancelRequested)
if err != nil {
return errors.Wrapf(err, "updating status for IMPORT and RESTORE jobs")
}
}

// Don't clear the jobs table as to not delete the jobs that are performing
// the restore.
if systemTableName == systemschema.SettingsTable.Name {
Expand Down Expand Up @@ -2011,6 +2110,23 @@ func (r *restoreResumer) restoreSystemTables(
return nil
}

// writeJobsList writes to the given string builder the provided jobIDs to the
// given string builder as a SQL list.
func writeJobsList(jobs []int64, q *strings.Builder) {
if q == nil {
return
}

fmt.Fprint(q, "(")
for i, job := range jobs {
if i > 0 {
fmt.Fprint(q, ", ")
}
fmt.Fprintf(q, "'%d'", job)
}
fmt.Fprint(q, ")")
}

var _ jobs.Resumer = &restoreResumer{}

func init() {
Expand Down

0 comments on commit dfe5778

Please sign in to comment.