Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

backup,restore: support backing up / restore system databases #1048

Merged
merged 17 commits into from
Apr 30, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newFullBackupCommand() *cobra.Command {
return runBackupCommand(command, "Full backup")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, acceptAllTables)
return command
}

Expand Down
14 changes: 14 additions & 0 deletions cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ var (
hasLogFile uint64
tidbGlue = gluetidb.New()
envLogToTermKey = "BR_LOG_TO_TERM"

filterOutSysAndMemTables = []string{
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
"!mysql.*",
"!sys.*",
"!INFORMATION_SCHEMA.*",
"!PERFORMANCE_SCHEMA.*",
"!METRICS_SCHEMA.*",
"!INSPECTION_SCHEMA.*",
}
acceptAllTables = []string{
"*.*",
}
)

const (
Expand Down
4 changes: 2 additions & 2 deletions cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func newFullRestoreCommand() *cobra.Command {
return runRestoreCommand(cmd, "Full restore")
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
return command
}

Expand Down Expand Up @@ -159,7 +159,7 @@ func newLogRestoreCommand() *cobra.Command {
return runLogRestoreCommand(cmd)
},
}
task.DefineFilterFlags(command)
task.DefineFilterFlags(command, filterOutSysAndMemTables)
task.DefineLogRestoreFlags(command)
return command
}
Expand Down
4 changes: 4 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,7 @@ error = '''
failed to write and ingest
'''

["BR:Restore:ErrUnsupportedSysTable"]
error = '''
the system table isn't supported for restoring yet
'''
2 changes: 1 addition & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func BuildBackupRangeAndSchema(

for _, dbInfo := range dbs {
// skip system databases
if util.IsMemOrSysDB(dbInfo.Name.L) {
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) {
continue
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (ss *Schemas) BackupSchemas(

schemas := make([]*backuppb.Schema, 0, len(ss.schemas))
for name, schema := range ss.schemas {
if utils.IsSysDB(schema.dbInfo.Name.L) {
schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O)
}
dbBytes, err := json.Marshal(schema.dbInfo)
if err != nil {
return nil, errors.Trace(err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(backupSchemas, IsNil)

// Empty database.
noFilter, err := filter.Parse([]string{"*.*"})
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
c.Assert(err, IsNil)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
s.mock.Storage, noFilter, math.MaxUint64)
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrRestoreInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Restore:ErrRestoreInvalidRange"))
ErrRestoreWriteAndIngest = errors.Normalize("failed to write and ingest", errors.RFCCodeText("BR:Restore:ErrRestoreWriteAndIngest"))
ErrRestoreSchemaNotExists = errors.Normalize("schema not exists", errors.RFCCodeText("BR:Restore:ErrRestoreSchemaNotExists"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))

// TODO maybe it belongs to PiTR.
ErrRestoreRTsConstrain = errors.Normalize("resolved ts constrain violation", errors.RFCCodeText("BR:Restore:ErrRestoreResolvedTsConstrain"))
Expand Down
2 changes: 1 addition & 1 deletion pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (g Glue) GetVersion() string {

// Execute implements glue.Session.
func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
_, err := gs.se.Execute(ctx, sql)
_, err := gs.se.ExecuteInternal(ctx, sql)
return errors.Trace(err)
}

Expand Down
163 changes: 163 additions & 0 deletions pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
"context"
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"go.uber.org/multierr"
"go.uber.org/zap"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/utils"
)

// RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema).
// Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) {
sysDB := mysql.SystemDB

temporaryDB := utils.TemporaryDBName(sysDB)
defer rc.cleanTemporaryDatabase(ctx, sysDB)
3pointer marked this conversation as resolved.
Show resolved Hide resolved

if !f.MatchSchema(temporaryDB.O) {
log.Debug("system database filtered out", zap.String("database", sysDB))
return
}
originDatabase, ok := rc.databases[temporaryDB.O]
if !ok {
log.Info("system database not backed up, skipping", zap.String("database", sysDB))
return
}
db, ok := rc.getDatabaseByName(sysDB)
if !ok {
// Or should we create the database here?
log.Warn("target database not exist, aborting", zap.String("database", sysDB))
return
}

tablesRestored := make([]string, 0, len(originDatabase.Tables))
for _, table := range originDatabase.Tables {
tableName := table.Info.Name
if f.MatchTable(sysDB, tableName.O) {
rc.replaceTemporaryTableToSystable(ctx, tableName.L, db)
}
tablesRestored = append(tablesRestored, tableName.L)
}
if err := rc.afterSystemTablesReplaced(ctx, tablesRestored); err != nil {
for _, e := range multierr.Errors(err) {
logutil.WarnTerm("error during winding up the restoration of system table", zap.String("database", sysDB), logutil.ShortError(e))
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// database is a record of a database.
// For fast querying whether a table exists and the temporary database of it.
type database struct {
ExistingTables map[string]*model.TableInfo
Name model.CIStr
TemporaryName model.CIStr
}

// getDatabaseByName make a record of a database from info schema by its name.
func (rc *Client) getDatabaseByName(name string) (*database, bool) {
infoSchema := rc.dom.InfoSchema()
schema, ok := infoSchema.SchemaByName(model.NewCIStr(name))
if !ok {
return nil, false
}
db := &database{
ExistingTables: map[string]*model.TableInfo{},
Name: model.NewCIStr(name),
TemporaryName: utils.TemporaryDBName(name),
}
for _, t := range schema.Tables {
db.ExistingTables[t.Name.L] = t
}
return db, true
}

// afterSystemTablesReplaced do some extra work for special system tables.
// e.g. after inserting to the table mysql.user, we must execute `FLUSH PRIVILEGES` to allow it take effect.
func (rc *Client) afterSystemTablesReplaced(ctx context.Context, tables []string) error {
needFlushStat := false
var err error
for _, table := range tables {
switch {
case table == "user":
// We cannot execute `rc.dom.NotifyUpdatePrivilege` here, because there isn't
// sessionctx.Context provided by the glue.
// TODO: update the glue type and allow we retrive a session context from it.
err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable,
"restored user info may not take effect, until you should execute `FLUSH PRIVILEGES` manually"))
case strings.HasPrefix(table, "stats_"):
needFlushStat = true
}
}
if needFlushStat {
// The newly created tables have different table IDs with original tables,
// hence the old statistics are invalid.
//
// TODO:
// Plan A) rewrite the IDs in the `rc.statsHandler.Update(rc.dom.InfoSchema())`.
// Plan B) give the user a SQL to let him update it manually.
// If plan A applied, we can deprecate the origin interface for backing up statistics.
err = multierr.Append(err, errors.Annotatef(berrors.ErrUnsupportedSystemTable,
kennytm marked this conversation as resolved.
Show resolved Hide resolved
"table ID has been changed, old statistics would not be valid anymore"))
}
return err
}

// replaceTemporaryTableToSystable replaces the temporary table to real system table.
func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, tableName string, db *database) {
execSQL := func(sql string) {
if err := rc.db.se.Execute(ctx, sql); err != nil {
logutil.WarnTerm("failed to restore system table",
logutil.ShortError(err),
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql))
} else {
log.Info("successfully restore system database",
zap.String("table", tableName),
zap.Stringer("database", db.Name),
zap.String("sql", sql),
)
}
}
if db.ExistingTables[tableName] != nil {
log.Info("table existing, using replace into for restore",
zap.String("table", tableName),
zap.Stringer("schema", db.Name))
replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s SELECT * FROM %s;",
utils.EncloseDBAndTable(db.Name.L, tableName),
utils.EncloseDBAndTable(db.TemporaryName.L, tableName))
execSQL(replaceIntoSQL)
return
}
renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;",
utils.EncloseDBAndTable(db.TemporaryName.L, tableName),
utils.EncloseDBAndTable(db.Name.L, tableName),
)
execSQL(renameSQL)
}

func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) {
database := utils.TemporaryDBName(originDB)
log.Debug("dropping temporary database", zap.Stringer("database", database))
sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L))
if err := rc.db.se.Execute(ctx, sql); err != nil {
logutil.WarnTerm("failed to drop temporary database, it should be dropped manually",
zap.Stringer("database", database),
logutil.ShortError(err),
)
}
}
4 changes: 2 additions & 2 deletions pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ func DefineTableFlags(command *cobra.Command) {
}

// DefineFilterFlags defines the --filter and --case-sensitive flags for `full` subcommand.
func DefineFilterFlags(command *cobra.Command) {
func DefineFilterFlags(command *cobra.Command, defaultFilter []string) {
flags := command.Flags()
flags.StringArrayP(flagFilter, "f", []string{"*.*"}, "select tables to process")
flags.StringArrayP(flagFilter, "f", defaultFilter, "select tables to process")
flags.Bool(flagCaseSensitive, false, "whether the table names used in --filter should be case-sensitive")
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf
return errors.Trace(err)
}

// The cost of rename user table / replace into system table wouldn't be so high.
// So leave it out of the pipeline for easier implementation.
client.RestoreSystemSchemas(ctx, cfg.TableFilter)

// Set task summary to success status.
summary.SetSuccessStatus(true)
return nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/utils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ package utils

import (
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/tablecodec"

Expand Down Expand Up @@ -155,3 +157,19 @@ func ArchiveSize(meta *backuppb.BackupMeta) uint64 {
func EncloseName(name string) string {
return "`" + strings.ReplaceAll(name, "`", "``") + "`"
}

// EncloseDBAndTable formats the database and table name in sql.
func EncloseDBAndTable(database, table string) string {
return fmt.Sprintf("%s.%s", EncloseName(database), EncloseName(table))
}

// IsSysDB tests whether the database is system DB.
// Currently, the only system DB is mysql.
func IsSysDB(dbLowerName string) bool {
return dbLowerName == mysql.SystemDB
}

// TemporaryDBName makes a 'private' database name.
func TemporaryDBName(db string) model.CIStr {
return model.NewCIStr("__TiDB_BR_Temporary_" + db)
}
2 changes: 1 addition & 1 deletion tests/br_full_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ cluster_index_before_backup=$(run_sql "show variables like '%cluster%';" | awk '
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/$DB" --ratelimit 5 --concurrency 4 --log-file $LOG --ignore-stats=false || cat $LOG
checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)

if [ "${checksum_count}" != "1" ];then
if [ "${checksum_count}" -lt "1" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum"
echo $(cat $LOG | grep checksum)
exit 1
Expand Down
4 changes: 2 additions & 2 deletions tests/br_full_index/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ BR_LOG_TO_TERM=1

checksum_count=$(cat $LOG | grep "checksum success" | wc -l | xargs)

if [ "${checksum_count}" != "$DB_COUNT" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum"
if [ "${checksum_count}" -lt "$DB_COUNT" ];then
echo "TEST: [$TEST_NAME] fail on fast checksum: required $DB_COUNT databases checked, but only ${checksum_count} dbs checked"
echo $(cat $LOG | grep checksum)
exit 1
fi
Expand Down
52 changes: 52 additions & 0 deletions tests/br_systables/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#! /bin/bash

set -eux

backup_dir=$TEST_DIR/$TEST_NAME

test_data="('TiDB'),('TiKV'),('TiFlash'),('TiSpark'),('TiCDC'),('TiPB'),('Rust'),('C++'),('Go'),('Haskell'),('Scala')"

modify_systables() {
run_sql "CREATE USER 'Alyssa P. Hacker'@'%' IDENTIFIED BY 'password';"
run_sql "UPDATE mysql.tidb SET VARIABLE_VALUE = '1h' WHERE VARIABLE_NAME = 'tikv_gc_life_time';"

run_sql "CREATE TABLE mysql.foo(pk int primary key auto_increment, field varchar(255));"
run_sql "CREATE TABLE mysql.bar(pk int primary key auto_increment, field varchar(255));"

run_sql "INSERT INTO mysql.foo(field) VALUES $test_data"
run_sql "INSERT INTO mysql.bar(field) VALUES $test_data"

go-ycsb load mysql -P tests/"$TEST_NAME"/workload \
-p mysql.host="$TIDB_IP" \
-p mysql.port="$TIDB_PORT" \
-p mysql.user=root \
-p mysql.db=mysql

run_sql "ANALYZE TABLE mysql.usertable;"
}

rollback_modify() {
run_sql "DROP TABLE mysql.foo;"
run_sql "DROP TABLE mysql.bar;"
run_sql "UPDATE mysql.tidb SET VARIABLE_VALUE = '10m' WHERE VARIABLE_NAME = 'tikv_gc_life_time';"
run_sql "DROP USER 'Alyssa P. Hacker';"
run_sql "DROP TABLE mysql.usertable;"
}

check() {
run_sql "SELECT count(*) from mysql.foo;" | grep 11
run_sql "SELECT count(*) from mysql.usertable;" | grep 1000
run_sql "SHOW TABLES IN mysql;" | grep -v bar

# TODO remove this after supporting auto flush.
run_sql "FLUSH PRIVILEGES;"
run_sql "SELECT CURRENT_USER();" -u'Alyssa P. Hacker' -p'password' | grep 'Alyssa P. Hacker'
run_sql "SHOW DATABASES" | grep -v '__TiDB_BR_Temporary_'
# TODO check stats after supportting.
}

modify_systables
run_br backup full -s "local://$backup_dir"
rollback_modify
run_br restore full -f '*.*' -f '!mysql.bar' -s "local://$backup_dir"
check
Loading