Skip to content

Commit

Permalink
Merge branch 'cp-fix-display' of https://github.com/yujuncen/tidb int…
Browse files Browse the repository at this point in the history
…o cp-fix-display
  • Loading branch information
YuJuncen committed Jul 18, 2022
2 parents 1d6f1e2 + f0acd5f commit 55f9db9
Show file tree
Hide file tree
Showing 99 changed files with 3,471 additions and 221 deletions.
1 change: 0 additions & 1 deletion .bazelrc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ build:release --config=ci
build --incompatible_strict_action_env --incompatible_enable_cc_toolchain_resolution
build:ci --remote_cache=http://172.16.4.21:8080/tidb --remote_timeout="10s"
test:ci --verbose_failures
test:ci --test_timeout=180
test:ci --test_env=GO_TEST_WRAP_TESTV=1 --test_verbose_timeout_warnings
test:ci --remote_cache=http://172.16.4.21:8080/tidb --remote_timeout="10s"
test:ci --test_env=TZ=Asia/Shanghai --test_output=errors --experimental_ui_max_stdouterr_bytes=104857600
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2510,8 +2510,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:VKMmvYhtG28j1sCCBdq4s+V9UOYqNgQ6CQviQwOgTeg=",
version = "v0.0.0-20220705090230-a5d4ffd2ba33",
sum = "h1:PAXtUVMJnyQQS8t9GzihIFmh6FBXu0JziWbIVknLniA=",
version = "v0.0.0-20220711062932-08b02befd813",
)
go_repository(
name = "com_github_pingcap_log",
Expand All @@ -2531,8 +2531,8 @@ def go_deps():
name = "com_github_pingcap_tipb",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/tipb",
sum = "h1:XaTE4ZhQbQtQZtAVzlZh/Pf6SjFfMSTe1ia2nGcl36Y=",
version = "v0.0.0-20220706024432-7be3cc83a7d5",
sum = "h1:hE1dQdnvxWCHhD0snX67paV9y6inq8TxVFbsKqjaTQk=",
version = "v0.0.0-20220714100504-7d3474676bc9",
)
go_repository(
name = "com_github_pkg_browser",
Expand Down Expand Up @@ -3012,8 +3012,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:RpH/obpgyNKkXV4Wt8PqSdcUTnqWyExPcla+qdTVgi0=",
version = "v2.0.1-0.20220711061028-1c198aab9585",
sum = "h1:nbcwXbkilywhMoAseLPzg/VHdFNhMEWy6JeqL/Gmq7A=",
version = "v2.0.1-0.20220713085647-57c12f7c64f6",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
8 changes: 8 additions & 0 deletions br/cmd/br/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ var (
"*.*",
fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")),
"!mysql.*",
"mysql.user",
"mysql.db",
"mysql.tables_priv",
"mysql.columns_priv",
"mysql.global_priv",
"mysql.global_grants",
"mysql.default_roles",
"mysql.role_edges",
"!sys.*",
"!INFORMATION_SCHEMA.*",
"!PERFORMANCE_SCHEMA.*",
Expand Down
23 changes: 23 additions & 0 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
package main

import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/gluetikv"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/task"
Expand Down Expand Up @@ -38,11 +41,31 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
}
if err := task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg); err != nil {
log.Error("failed to restore", zap.Error(err))
printWorkaroundOnFullRestoreError(command, err)
return errors.Trace(err)
}
return nil
}

// print workaround when we met not fresh or incompatible cluster error on full cluster restore
func printWorkaroundOnFullRestoreError(command *cobra.Command, err error) {
if !errors.ErrorEqual(err, berrors.ErrRestoreNotFreshCluster) &&
!errors.ErrorEqual(err, berrors.ErrRestoreIncompatibleSys) {
return
}
fmt.Println("#######################################################################")
switch {
case errors.ErrorEqual(err, berrors.ErrRestoreNotFreshCluster):
fmt.Println("# the target cluster is not fresh, br cannot restore system tables.")
case errors.ErrorEqual(err, berrors.ErrRestoreIncompatibleSys):
fmt.Println("# the target cluster is not compatible with the backup data,")
fmt.Println("# br cannot restore system tables.")
}
fmt.Println("# you can use the following command to skip restoring system tables:")
fmt.Printf("# br restore %s --filter '*.*' --filter '!mysql.*' \n", command.Use)
fmt.Println("#######################################################################")
}

func runRestoreRawCommand(command *cobra.Command, cmdName string) error {
cfg := task.RestoreRawConfig{
RawKvConfig: task.RawKvConfig{Config: task.Config{LogProgress: HasLogFile()}},
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
ErrRestoreInvalidRange = errors.Normalize("invalid restore range", errors.RFCCodeText("BR:Restore:ErrRestoreInvalidRange"))
ErrRestoreWriteAndIngest = errors.Normalize("failed to write and ingest", errors.RFCCodeText("BR:Restore:ErrRestoreWriteAndIngest"))
ErrRestoreSchemaNotExists = errors.Normalize("schema not exists", errors.RFCCodeText("BR:Restore:ErrRestoreSchemaNotExists"))
ErrRestoreNotFreshCluster = errors.Normalize("cluster is not fresh", errors.RFCCodeText("BR:Restore:ErrRestoreNotFreshCluster"))
ErrRestoreIncompatibleSys = errors.Normalize("incompatible system table", errors.RFCCodeText("BR:Restore:ErrRestoreIncompatibleSys"))
ErrUnsupportedSystemTable = errors.Normalize("the system table isn't supported for restoring yet", errors.RFCCodeText("BR:Restore:ErrUnsupportedSysTable"))
ErrDatabasesAlreadyExisted = errors.Normalize("databases already existed in restored cluster", errors.RFCCodeText("BR:Restore:ErrDatabasesAlreadyExisted"))

Expand Down
143 changes: 141 additions & 2 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,13 @@ import (
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
ddlutil "github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/statistics/handle"
"github.com/pingcap/tidb/store/pdtypes"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -147,6 +149,16 @@ type Client struct {

storage storage.ExternalStorage

// if fullClusterRestore = true:
// - if there's system tables in the backup(backup data since br 5.1.0), the cluster should be a fresh cluster
// without user database or table. and system tables about privileges is restored together with user data.
// - if there no system tables in the backup(backup data from br < 5.1.0), restore all user data just like
// previous version did.
// if fullClusterRestore = false, restore all user data just like previous version did.
// fullClusterRestore = true when there is no explicit filter setting, and it's full restore or point command
// with a full backup data.
// todo: maybe change to an enum
fullClusterRestore bool
// the query to insert rows into table `gc_delete_range`, lack of ts.
deleteRangeQuery []string
deleteRangeQueryCh chan string
Expand Down Expand Up @@ -494,6 +506,14 @@ func (rc *Client) GetDatabase(name string) *utils.Database {
return rc.databases[name]
}

// HasBackedUpSysDB whether we have backed up system tables
// br backs system tables up since 5.1.0
func (rc *Client) HasBackedUpSysDB() bool {
temporaryDB := utils.TemporaryDBName(mysql.SystemDB)
_, backedUp := rc.databases[temporaryDB.O]
return backedUp
}

// GetPlacementPolicies returns policies.
func (rc *Client) GetPlacementPolicies() (*sync.Map, error) {
policies := &sync.Map{}
Expand Down Expand Up @@ -834,6 +854,101 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma
return eg.Wait()
}

// CheckTargetClusterFresh check whether the target cluster is fresh or not
// if there's no user dbs or tables, we take it as a fresh cluster, although
// user may have created some users or made other changes.
func (rc *Client) CheckTargetClusterFresh(ctx context.Context) error {
log.Info("checking whether target cluster is fresh")
userDBs := GetExistedUserDBs(rc.dom)
if len(userDBs) == 0 {
return nil
}

const maxPrintCount = 10
userTableOrDBNames := make([]string, 0, maxPrintCount+1)
addName := func(name string) bool {
if len(userTableOrDBNames) == maxPrintCount {
userTableOrDBNames = append(userTableOrDBNames, "...")
return false
}
userTableOrDBNames = append(userTableOrDBNames, name)
return true
}
outer:
for _, db := range userDBs {
if !addName(db.Name.L) {
break outer
}
for _, tbl := range db.Tables {
if !addName(tbl.Name.L) {
break outer
}
}
}
log.Error("not fresh cluster", zap.Strings("user tables", userTableOrDBNames))
return errors.Annotate(berrors.ErrRestoreNotFreshCluster, "user db/tables: "+strings.Join(userTableOrDBNames, ", "))
}

func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metautil.Table) error {
log.Info("checking target cluster system table compatibility with backed up data")
privilegeTablesInBackup := make([]*metautil.Table, 0)
for _, table := range tables {
decodedSysDBName, ok := utils.GetSysDBCIStrName(table.DB.Name)
if ok && utils.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableSet[table.Info.Name.L] {
privilegeTablesInBackup = append(privilegeTablesInBackup, table)
}
}
sysDB := model.NewCIStr(mysql.SystemDB)
for _, table := range privilegeTablesInBackup {
ti, err := rc.GetTableSchema(dom, sysDB, table.Info.Name)
if err != nil {
log.Error("missing table on target cluster", zap.Stringer("table", table.Info.Name))
return errors.Annotate(berrors.ErrRestoreIncompatibleSys, "missed system table: "+table.Info.Name.O)
}
backupTi := table.Info
if len(ti.Columns) != len(backupTi.Columns) {
log.Error("column count mismatch",
zap.Stringer("table", table.Info.Name),
zap.Int("col in cluster", len(ti.Columns)),
zap.Int("col in backup", len(backupTi.Columns)))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"column count mismatch, table: %s, col in cluster: %d, col in backup: %d",
table.Info.Name.O, len(ti.Columns), len(backupTi.Columns))
}
backupColMap := make(map[string]*model.ColumnInfo)
for i := range backupTi.Columns {
col := backupTi.Columns[i]
backupColMap[col.Name.L] = col
}
// order can be different but type must compatible
for i := range ti.Columns {
col := ti.Columns[i]
backupCol := backupColMap[col.Name.L]
if backupCol == nil {
log.Error("missing column in backup data",
zap.Stringer("table", table.Info.Name),
zap.String("col", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"missing column in backup data, table: %s, col: %s %s",
table.Info.Name.O,
col.Name, col.FieldType.String())
}
if !utils.IsTypeCompatible(backupCol.FieldType, col.FieldType) {
log.Error("incompatible column",
zap.Stringer("table", table.Info.Name),
zap.String("col in cluster", fmt.Sprintf("%s %s", col.Name, col.FieldType.String())),
zap.String("col in backup", fmt.Sprintf("%s %s", backupCol.Name, backupCol.FieldType.String())))
return errors.Annotatef(berrors.ErrRestoreIncompatibleSys,
"incompatible column, table: %s, col in cluster: %s %s, col in backup: %s %s",
table.Info.Name.O,
col.Name, col.FieldType.String(),
backupCol.Name, backupCol.FieldType.String())
}
}
}
return nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
Expand Down Expand Up @@ -1467,6 +1582,14 @@ func (rc *Client) getRuleID(tableID int64) string {
return "restore-t" + strconv.FormatInt(tableID, 10)
}

// IsFull returns whether this backup is full.
func (rc *Client) IsFull() bool {
failpoint.Inject("mock-incr-backup-data", func() {
failpoint.Return(false)
})
return !rc.IsIncremental()
}

// IsIncremental returns whether this backup is incremental.
func (rc *Client) IsIncremental() bool {
return !(rc.backupMeta.StartVersion == rc.backupMeta.EndVersion ||
Expand Down Expand Up @@ -1842,7 +1965,7 @@ func (rc *Client) InitSchemasReplaceForDDL(
}()...)
}

return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs), nil
return stream.NewSchemasReplace(dbMap, rc.currentTS, tableFilter, rc.GenGlobalID, rc.GenGlobalIDs, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex), nil
}

// RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
Expand Down Expand Up @@ -1952,7 +2075,7 @@ func (rc *Client) RestoreMetaKVFile(
}
log.Debug("txn entry", zap.Uint64("key-ts", ts), zap.Int("txnKey-len", len(txnEntry.Key)),
zap.Int("txnValue-len", len(txnEntry.Value)), zap.ByteString("txnKey", txnEntry.Key))
newEntry, err := sr.RewriteKvEntry(&txnEntry, file.Cf, rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex)
newEntry, err := sr.RewriteKvEntry(&txnEntry, file.Cf)
if err != nil {
log.Error("rewrite txn entry failed", zap.Int("klen", len(txnEntry.Key)),
logutil.Key("txn-key", txnEntry.Key))
Expand Down Expand Up @@ -2188,6 +2311,22 @@ func (rc *Client) SaveSchemas(
return nil
}

// InitFullClusterRestore init fullClusterRestore and set SkipGrantTable as needed
func (rc *Client) InitFullClusterRestore(explicitFilter bool) {
rc.fullClusterRestore = !explicitFilter && rc.IsFull()

log.Info("full cluster restore", zap.Bool("value", rc.fullClusterRestore))

if rc.fullClusterRestore {
// have to skip grant table, in order to NotifyUpdatePrivilege
config.GetGlobalConfig().Security.SkipGrantTable = true
}
}

func (rc *Client) IsFullClusterRestore() bool {
return rc.fullClusterRestore
}

// MockClient create a fake client used to test.
func MockClient(dbs map[string]*utils.Database) *Client {
return &Client{databases: dbs}
Expand Down
Loading

0 comments on commit 55f9db9

Please sign in to comment.