Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

backup: dump stats to json should not be fatal #682

Merged
merged 3 commits into from
Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,12 @@ func BuildBackupRangeAndSchema(
// Skip tables other than the given table.
continue
}

logger := log.With(
zap.String("db", dbInfo.Name.O),
zap.String("table", tableInfo.Name.O),
)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
Expand All @@ -314,14 +320,10 @@ func BuildBackupRangeAndSchema(
return nil, nil, errors.Trace(err)
}
tableInfo.AutoRandID = globalAutoRandID
log.Info("change table AutoRandID",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tableInfo.Name),
logger.Info("change table AutoRandID",
zap.Int64("AutoRandID", globalAutoRandID))
}
log.Info("change table AutoIncID",
zap.Stringer("db", dbInfo.Name),
zap.Stringer("table", tableInfo.Name),
logger.Info("change table AutoIncID",
zap.Int64("AutoIncID", globalAutoID))

// remove all non-public indices
Expand Down Expand Up @@ -349,11 +351,12 @@ func BuildBackupRangeAndSchema(
if !ignoreStats {
jsonTable, err := h.DumpStatsToJSON(dbInfo.Name.String(), tableInfo, nil)
if err != nil {
return nil, nil, errors.Trace(err)
}
stats, err = json.Marshal(jsonTable)
if err != nil {
return nil, nil, errors.Trace(err)
logger.Error("dump table stats failed", logutil.ShortError(err))
} else {
stats, err = json.Marshal(jsonTable)
if err != nil {
logger.Error("dump table stats failed (cannot serialize)", logutil.ShortError(err))
}
}
}

Expand Down
62 changes: 59 additions & 3 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ func (s *testBackupSchemaSuite) SetUpSuite(c *C) {
var err error
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
c.Assert(s.mock.Start(), IsNil)
}

func (s *testBackupSchemaSuite) TearDownSuite(c *C) {
s.mock.Stop()
testleak.AfterTest(c)()
}

Expand All @@ -52,9 +54,6 @@ func (sp *simpleProgress) get() int64 {
}

func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(s.mock.Start(), IsNil)
defer s.mock.Stop()

tk := testkit.NewTestKit(c, s.mock.Storage)

// Table t1 is not exist.
Expand Down Expand Up @@ -124,3 +123,60 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
c.Assert(schemas[1].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[1]))
c.Assert(schemas[1].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[1]))
}

func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchemaWithBrokenStats(c *C) {
tk := testkit.NewTestKit(c, s.mock.Storage)
tk.MustExec("use test")
tk.MustExec("drop table if exists t3;")
tk.MustExec("create table t3 (a char(1));")
tk.MustExec("insert into t3 values ('1');")
tk.MustExec("analyze table t3;")
// corrupt the statistics like pingcap/br#679.
tk.MustExec(`
update mysql.stats_buckets set upper_bound = 0xffffffff
where table_id = (
select tidb_table_id from information_schema.tables
where (table_schema, table_name) = ('test', 't3')
);
`)

f, err := filter.Parse([]string{"test.t3"})
c.Assert(err, IsNil)

_, backupSchemas, err := backup.BuildBackupRangeAndSchema(s.mock.Domain, s.mock.Storage, f, math.MaxUint64, false)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, 1)

updateCh := new(simpleProgress)
backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, updateCh)
schemas, err := backupSchemas.FinishTableChecksum()

// the stats should be empty, but other than that everything should be backed up.
c.Assert(err, IsNil)
c.Assert(schemas[0].Stats, HasLen, 0)
c.Assert(schemas[0].Crc64Xor, Not(Equals), 0)
c.Assert(schemas[0].TotalKvs, Not(Equals), 0)
c.Assert(schemas[0].TotalBytes, Not(Equals), 0)
c.Assert(schemas[0].Table, Not(HasLen), 0)
c.Assert(schemas[0].Db, Not(HasLen), 0)

// recover the statistics.
tk.MustExec("analyze table t3;")

_, backupSchemas, err = backup.BuildBackupRangeAndSchema(s.mock.Domain, s.mock.Storage, f, math.MaxUint64, false)
c.Assert(err, IsNil)
c.Assert(backupSchemas.Len(), Equals, 1)

updateCh.reset()
backupSchemas.Start(context.Background(), s.mock.Storage, math.MaxUint64, 1, variable.DefChecksumTableConcurrency, updateCh)
schemas2, err := backupSchemas.FinishTableChecksum()

// the stats should now be filled, and other than that the result should be equivalent to the first backup.
c.Assert(err, IsNil)
c.Assert(schemas2[0].Stats, Not(HasLen), 0)
c.Assert(schemas2[0].Crc64Xor, Equals, schemas[0].Crc64Xor)
c.Assert(schemas2[0].TotalKvs, Equals, schemas[0].TotalKvs)
c.Assert(schemas2[0].TotalBytes, Equals, schemas[0].TotalBytes)
c.Assert(schemas2[0].Table, DeepEquals, schemas[0].Table)
c.Assert(schemas2[0].Db, DeepEquals, schemas[0].Db)
}
20 changes: 9 additions & 11 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,11 +765,13 @@ func (rc *Client) GoValidateChecksum(
}

func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint) error {
logger := log.With(
zap.String("db", tbl.OldTable.DB.Name.O),
zap.String("table", tbl.OldTable.Info.Name.O),
)

if tbl.OldTable.NoChecksum() {
log.Warn("table has no checksum, skipping checksum",
zap.Stringer("table", tbl.OldTable.Info.Name),
zap.Stringer("database", tbl.OldTable.DB.Name),
)
logger.Warn("table has no checksum, skipping checksum")
return nil
}

Expand Down Expand Up @@ -801,9 +803,7 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k
if checksumResp.Checksum != table.Crc64Xor ||
checksumResp.TotalKvs != table.TotalKvs ||
checksumResp.TotalBytes != table.TotalBytes {
log.Error("failed in validate checksum",
zap.String("database", table.DB.Name.L),
zap.String("table", table.Info.Name.L),
logger.Error("failed in validate checksum",
zap.Uint64("origin tidb crc64", table.Crc64Xor),
zap.Uint64("calculated crc64", checksumResp.Checksum),
zap.Uint64("origin tidb total kvs", table.TotalKvs),
Expand All @@ -814,14 +814,12 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
if table.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Stringer("db name", tbl.OldTable.DB.Name),
zap.Stringer("name", tbl.OldTable.Info.Name),
logger.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
logger.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
}
}
return nil
Expand Down