Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

backup,restore: support backing up / restore system databases #1048

Merged
merged 17 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newFullBackupCommand() *cobra.Command {
return runBackupCommand(command, "Full backup")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, acceptAllTables)
return command
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ var (
hasLogFile uint64
tidbGlue = gluetidb.New()
envLogToTermKey = "BR_LOG_TO_TERM"

filterOutSysAndMemTables = []string{
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
"!mysql.*",
"!sys.*",
"!INFORMATION_SCHEMA.*",
"!PERFORMANCE_SCHEMA.*",
"!METRICS_SCHEMA.*",
"!INSPECTION_SCHEMA.*",
}
acceptAllTables = []string{
"*.*",
}
)

const (
Expand Down
4 changes: 2 additions & 2 deletions cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newFullRestoreCommand() *cobra.Command {
return runRestoreCommand(cmd, "Full restore")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
return command
}

Expand Down Expand Up @@ -159,7 +159,7 @@ func newLogRestoreCommand() *cobra.Command {
return runLogRestoreCommand(cmd)
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
task.DefineLogRestoreFlags(command)
return command
}
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,8 @@ error = '''
failed to write and ingest
'''

["BR:Restore:ErrUnsupportedSysTable"]
error = '''
the system table isn't supported for restoring yet
'''

2 changes: 1 addition & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func BuildBackupRangeAndSchema(

for _, dbInfo := range dbs {
// skip system databases
if util.IsMemOrSysDB(dbInfo.Name.L) {
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) {
continue
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (ss *Schemas) BackupSchemas(

schemas := make([]*backuppb.Schema, 0, len(ss.schemas))
for name, schema := range ss.schemas {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
dbBytes, err := json.Marshal(schema.dbInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(backupSchemas, IsNil)

// Empty database.
noFilter, err := filter.Parse([]string{"*.*"})
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
c.Assert(err, IsNil)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
s.mock.Storage, noFilter, math.MaxUint64)
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrRestoreInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Restore:ErrRestoreInvalidRange"))
ErrRestoreWriteAndIngest = errors.Normalize("failed to write and ingest", errors.RFCCodeText("BR:Restore:ErrRestoreWriteAndIngest"))
ErrRestoreSchemaNotExists = errors.Normalize("schema not exists", errors.RFCCodeText("BR:Restore:ErrRestoreSchemaNotExists"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))

// TODO maybe it belongs to PiTR.
ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (g Glue) GetVersion() string {

// Execute implements glue.Session.
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
_, err := gs.se.ExecuteInternal(ctx, sql)
return errors.Trace(err)
}

Expand Down
185 changes: 185 additions & 0 deletions pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"context"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"go.uber.org/multierr"
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/utils"
)

var statsTables = map[string]struct{}{
"stats_buckets": {},
"stats_extended": {},
"stats_feedback": {},
"stats_fm_sketch": {},
"stats_histograms": {},
"stats_meta": {},
"stats_top_n": {},
}

func isStatsTable(tableName string) bool {
_, ok := statsTables[tableName]
return ok
}

// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
sysDB := mysql.SystemDB

temporaryDB := utils.TemporaryDBName(sysDB)
defer rc.cleanTemporaryDatabase(ctx, sysDB)
3pointer marked this conversation as resolved.
Show resolved Hide resolved

if !f.MatchSchema(temporaryDB.O) {
log.Debug("system database filtered out", zap.String("database", sysDB))
return
}
originDatabase, ok := rc.databases[temporaryDB.O]
if !ok {
log.Info("system database not backed up, skipping", zap.String("database", sysDB))
return
}
db, ok := rc.getDatabaseByName(sysDB)
if !ok {
// Or should we create the database here?
log.Warn("target database not exist, aborting", zap.String("database", sysDB))
return
}

tablesRestored := make([]string, 0, len(originDatabase.Tables))
for _, table := range originDatabase.Tables {
tableName := table.Info.Name
if f.MatchTable(sysDB, tableName.O) {
if err := rc.replaceTemporaryTableToSystable(ctx, tableName.L, db); err != nil {
logutil.WarnTerm("error during merging temporary tables into system tables",
logutil.ShortError(err),
zap.Stringer("table", tableName),
)
}
}
tablesRestored = append(tablesRestored, tableName.L)
}
if err := rc.afterSystemTablesReplaced(ctx, tablesRestored); err != nil {
for _, e := range multierr.Errors(err) {
logutil.WarnTerm("error during reconfigurating the system tables", zap.String("database", sysDB), logutil.ShortError(e))
}
}
}

// database is a record of a database.
// For fast querying whether a table exists and the temporary database of it.
type database struct {
ExistingTables map[string]*model.TableInfo
Name model.CIStr
TemporaryName model.CIStr
}

// getDatabaseByName make a record of a database from info schema by its name.
func (rc *Client) getDatabaseByName(name string) (*database, bool) {
infoSchema := rc.dom.InfoSchema()
schema, ok := infoSchema.SchemaByName(model.NewCIStr(name))
if !ok {
return nil, false
}
db := &database{
ExistingTables: map[string]*model.TableInfo{},
Name: model.NewCIStr(name),
TemporaryName: utils.TemporaryDBName(name),
}
for _, t := range schema.Tables {
db.ExistingTables[t.Name.L] = t
}
return db, true
}

// afterSystemTablesReplaced do some extra work for special system tables.
// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect.
func (rc *Client) afterSystemTablesReplaced(ctx context.Context, tables []string) error {
var err error
for _, table := range tables {
switch {
case table == "user":
// We cannot execute `rc.dom.NotifyUpdatePrivilege` here, because there isn't
// sessionctx.Context provided by the glue.
// TODO: update the glue type and allow we retrive a session context from it.
err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable,
"restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually"))
}
}
return err
}

// replaceTemporaryTableToSystable replaces the temporary table to real system table.
func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, tableName string, db *database) error {
execSQL := func(sql string) error {
// SQLs here only contain table name and database name, seems it is no need to redact them.
if err := rc.db.se.Execute(ctx, sql); err != nil {
log.Warn("failed to execute SQL restore system database",
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql),
zap.Error(err),
)
return berrors.ErrUnknown.Wrap(err).GenWithStack("failed to execute %s", sql)
}
log.Info("successfully restore system database",
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql),
)
return nil
}

// The newly created tables have different table IDs with original tables,
// hence the old statistics are invalid.
//
// TODO:
// 1 ) Rewrite the table IDs via `UPDATE _temporary_mysql.stats_xxx SET table_id = new_table_id WHERE table_id = old_table_id`
// BEFORE replacing into and then execute `rc.statsHandler.Update(rc.dom.InfoSchema())`.
// 1.5 ) (Optional) The UPDATE statement sometimes costs, the whole system tables restore step can be place into the restore pipeline.
// 2 ) Deprecate the origin interface for backing up statistics.
if isStatsTable(tableName) {
return berrors.ErrUnsupportedSystemTable.GenWithStack("restoring stats via `mysql` schema isn't support yet: " +
"the table ID is out-of-date and may corrupt existing statistics")
}

if db.ExistingTables[tableName] != nil {
log.Info("table existing, using replace into for restore",
zap.String("table", tableName),
zap.Stringer("schema", db.Name))
replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s SELECT * FROM %s;",
utils.EncloseDBAndTable(db.Name.L, tableName),
utils.EncloseDBAndTable(db.TemporaryName.L, tableName))
return execSQL(replaceIntoSQL)
}

renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;",
utils.EncloseDBAndTable(db.TemporaryName.L, tableName),
utils.EncloseDBAndTable(db.Name.L, tableName),
)
return execSQL(renameSQL)
}

func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) {
database := utils.TemporaryDBName(originDB)
log.Debug("dropping temporary database", zap.Stringer("database", database))
sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L))
if err := rc.db.se.Execute(ctx, sql); err != nil {
logutil.WarnTerm("failed to drop temporary database, it should be dropped manually",
zap.Stringer("database", database),
logutil.ShortError(err),
)
}
}
4 changes: 2 additions & 2 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func DefineTableFlags(command *cobra.Command) {
}

// DefineFilterFlags defines the --filter and --case-sensitive flags for `full` subcommand.
func DefineFilterFlags(command *cobra.Command) {
func DefineFilterFlags(command *cobra.Command, defaultFilter []string) {
flags := command.Flags()
flags.StringArrayP(flagFilter, "f", []string{"*.*"}, "select tables to process")
flags.StringArrayP(flagFilter, "f", defaultFilter, "select tables to process")
flags.Bool(flagCaseSensitive, false, "whether the table names used in --filter should be case-sensitive")
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}

// The cost of rename user table / replace into system table wouldn't be so high.
// So leave it out of the pipeline for easier implementation.
client.RestoreSystemSchemas(ctx, cfg.TableFilter)

// Set task summary to success status.
summary.SetSuccessStatus(true)
return nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ package utils

import (
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/tablecodec"

Expand Down Expand Up @@ -155,3 +157,19 @@ func ArchiveSize(meta *backuppb.BackupMeta) uint64 {
func EncloseName(name string) string {
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
}

// EncloseDBAndTable formats the database and table name in sql.
func EncloseDBAndTable(database, table string) string {
return fmt.Sprintf("%s.%s", EncloseName(database), EncloseName(table))
}

// IsSysDB tests whether the database is system DB.
// Currently, the only system DB is mysql.
func IsSysDB(dbLowerName string) bool {
return dbLowerName == mysql.SystemDB
}

// TemporaryDBName makes a 'private' database name.
func TemporaryDBName(db string) model.CIStr {
return model.NewCIStr("__TiDB_BR_Temporary_" + db)
}
2 changes: 1 addition & 1 deletion tests/br_full_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ cluster_index_before_backup=$(run_sql "show variables like '%cluster%';" | awk '
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG --ignore-stats=false || cat $LOG
checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)

if [ "${checksum_count}" != "1" ];then
if [ "${checksum_count}" -lt "1" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum"
echo $(cat $LOG | grep checksum)
exit 1
Expand Down
4 changes: 2 additions & 2 deletions tests/br_full_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ BR_LOG_TO_TERM=1

checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)

if [ "${checksum_count}" != "$DB_COUNT" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum"
if [ "${checksum_count}" -lt "$DB_COUNT" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum: required $DB_COUNT databases checked, but only ${checksum_count} dbs checked"
echo $(cat $LOG | grep checksum)
exit 1
fi
Expand Down
Loading