Skip to content

Commit

Permalink
DM: add checkpoint test for lightning-loader (#3220)
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Nov 18, 2021
1 parent 9311b9c commit eb64839
Show file tree
Hide file tree
Showing 16 changed files with 620 additions and 26 deletions.
3 changes: 2 additions & 1 deletion dm/cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func main() {
common.PrintLinesf("init logger error %s", terror.Message(err))
os.Exit(2)
}
lightningLog.SetAppLogger(log.L().Logger)

utils.LogHTTPProxies(true)

Expand All @@ -65,6 +64,8 @@ func main() {
lg, r, _ := globalLog.InitLogger(conf)
lg = lg.With(zap.String("component", "ddl tracker"))
globalLog.ReplaceGlobals(lg, r)
lightningLogger := lg.With(zap.String("component", "lightning"))
lightningLog.SetAppLogger(lightningLogger)

utils.PrintInfo("dm-worker", func() {
log.L().Info("", zap.Stringer("dm-worker config", cfg))
Expand Down
1 change: 1 addition & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,7 @@ func withHost(addr string) string {

func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string, toDBCfg *config.DBConfig) error {
toDBCfg.Adjust()

// clear shard meta data for pessimistic/optimist
err := s.pessimist.RemoveMetaData(taskName)
if err != nil {
Expand Down
103 changes: 102 additions & 1 deletion dm/loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package loader

import (
"context"
"encoding/json"
"fmt"
"strings"
Expand All @@ -31,6 +32,10 @@ import (
"go.uber.org/zap"
)

const (
LightningCheckpointListName = "lightning_checkpoint_list"
)

// CheckPoint represents checkpoint status.
type CheckPoint interface {
// Load loads all checkpoints recorded before.
Expand Down Expand Up @@ -100,7 +105,7 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
db, dbConns, err := createConns(tctx, cfg, 1)
db, dbConns, err := createConns(tctx, cfg, cfg.Name, cfg.SourceID, 1)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -464,3 +469,99 @@ func (cp *RemoteCheckPoint) String() string {
}
return string(bytes)
}

type LightningCheckpointList struct {
db *conn.BaseDB
schema string
tableName string
logger log.Logger
}

func NewLightningCheckpointList(db *conn.BaseDB, metaSchema string) *LightningCheckpointList {
return &LightningCheckpointList{
db: db,
schema: dbutil.ColumnName(metaSchema),
tableName: dbutil.TableName(metaSchema, LightningCheckpointListName),
logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")),
}
}

func (cp *LightningCheckpointList) Prepare(ctx context.Context) error {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection when prepare"), terror.ScopeDownstream)
}
createSchema := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", cp.schema)
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{createSchema})
if err != nil {
return err
}
createTable := `CREATE TABLE IF NOT EXISTS %s (
worker_name varchar(255) NOT NULL,
task_name varchar(255) NOT NULL,
PRIMARY KEY (task_name, worker_name)
);
`
sql2 := fmt.Sprintf(createTable, cp.tableName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql2})
return terror.WithScope(err, terror.ScopeDownstream)
}

func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, workerName, taskName string) error {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}

sql := fmt.Sprintf("INSERT IGNORE INTO %s (`worker_name`, `task_name`) VALUES(?,?)", cp.tableName)
cp.logger.Info("initial checkpoint record",
zap.String("sql", sql),
zap.String("worker-name", workerName),
zap.String("task-name", taskName))
args := []interface{}{workerName, taskName}
tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}, args)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream)
}
return nil
}

func (cp *LightningCheckpointList) RemoveTaskCheckPoint(ctx context.Context, taskName string) error {
connection, err := cp.db.GetBaseConn(ctx)
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream)
}

tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint")))
query := fmt.Sprintf("SELECT `worker_name` from %s where `task_name`=?", cp.tableName)
rows, err := connection.QuerySQL(tctx, query, taskName)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer rows.Close()
var workerName string
for rows.Next() {
err = rows.Scan(&workerName)
if err != nil {
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}
cpdb := config.TiDBLightningCheckpointPrefix + dbutil.TableName(workerName, taskName)
sql := fmt.Sprintf("DROP DATABASE IF NOT EXISTS %s", cpdb)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
}
query = fmt.Sprintf("DELETE from %s where `task_name`=?", cp.tableName)
_, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{query}, []interface{}{taskName})
return terror.WithScope(err, terror.ScopeDownstream)
}

// Close implements CheckPoint.Close.
func (cp *LightningCheckpointList) Close() {
if err := cp.db.Close(); err != nil {
cp.logger.Error("close checkpoint list db", log.ShortError(err))
}
}
15 changes: 9 additions & 6 deletions dm/loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
// DBConn represents a live DB connection
// it's not thread-safe.
type DBConn struct {
cfg *config.SubTaskConfig
name string
sourceID string
baseConn *conn.BaseConn

// generate new BaseConn and close old one
Expand Down Expand Up @@ -89,7 +90,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
cost := time.Since(startTime)
// duration seconds
ds := cost.Seconds()
queryHistogram.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Observe(ds)
queryHistogram.WithLabelValues(conn.name, conn.sourceID).Observe(ds)
if ds > 1 {
ctx.L().Warn("query statement too slow",
zap.Duration("cost time", cost),
Expand Down Expand Up @@ -123,7 +124,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
FirstRetryDuration: 2 * time.Second,
BackoffStrategy: retry.LinearIncrease,
IsRetryableFn: func(retryTime int, err error) bool {
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Inc()
tidbExecutionErrorCounter.WithLabelValues(conn.name, conn.sourceID).Inc()
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
_, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.cfg.Name, queries, args...)
_, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.name, queries, args...)
failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) {
errCode, err1 := strconv.ParseUint(val.(string), 10, 16)
if err1 != nil {
Expand Down Expand Up @@ -196,7 +197,9 @@ func (conn *DBConn) resetConn(tctx *tcontext.Context) error {
return nil
}

func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*DBConn, error) {
func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig,
name, sourceID string,
workerCount int) (*conn.BaseDB, []*DBConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(&cfg.To)
if err != nil {
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
Expand All @@ -218,7 +221,7 @@ func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount
}
return baseDB.GetBaseConn(tctx.Context())
}
conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn})
conns = append(conns, &DBConn{baseConn: baseConn, name: name, sourceID: sourceID, resetBaseConnFn: resetBaseConnFn})
}
return baseDB, conns, nil
}
Expand Down
80 changes: 70 additions & 10 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package loader
import (
"context"
"path/filepath"
"strings"
"sync"

"github.com/docker/go-units"
Expand Down Expand Up @@ -53,6 +54,7 @@ type LightningLoader struct {
cfg *config.SubTaskConfig
cli *clientv3.Client
checkPoint CheckPoint
checkPointList *LightningCheckpointList
workerName string
logger log.Logger
core *lightning.Lightning
Expand Down Expand Up @@ -114,13 +116,33 @@ func (l *LightningLoader) Type() pb.UnitType {
// if fail, it should not call l.Close.
func (l *LightningLoader) Init(ctx context.Context) (err error) {
tctx := tcontext.NewContext(ctx, l.logger)
toCfg, err := l.cfg.Clone()
if err != nil {
return err
}
l.toDB, l.toDBConns, err = createConns(tctx, l.cfg, toCfg.Name, toCfg.SourceID, 1)
if err != nil {
return err
}

checkpoint, err := newRemoteCheckPoint(tctx, l.cfg, l.checkpointID())
if err == nil {
l.checkPoint = checkpoint
checkpointList := NewLightningCheckpointList(l.toDB, l.cfg.MetaSchema)
err1 := checkpointList.Prepare(ctx)
if err1 == nil {
l.checkPointList = checkpointList
}
err = err1
}
failpoint.Inject("ignoreLoadCheckpointErr", func(_ failpoint.Value) {
l.logger.Info("", zap.String("failpoint", "ignoreLoadCheckpointErr"))
err = nil
})
l.checkPoint = checkpoint
l.toDB, l.toDBConns, err = createConns(tctx, l.cfg, 1)
if err != nil {
return err
}

timeZone := l.cfg.Timezone
if len(timeZone) == 0 {
var err1 error
Expand All @@ -130,6 +152,27 @@ func (l *LightningLoader) Init(ctx context.Context) (err error) {
}
}
l.timeZone = timeZone
return nil
}

func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error {
l.Lock()
taskCtx, cancel := context.WithCancel(ctx)
l.cancel = cancel
l.Unlock()
err := l.core.RunOnce(taskCtx, cfg, nil)
failpoint.Inject("LightningLoadDataSlowDown", nil)
failpoint.Inject("LightningLoadDataSlowDownByTask", func(val failpoint.Value) {
tasks := val.(string)
taskNames := strings.Split(tasks, ",")
for _, taskName := range taskNames {
if l.cfg.Name == taskName {
l.logger.Info("inject failpoint LightningLoadDataSlowDownByTask", zap.String("task", taskName))
<-taskCtx.Done()
}
}
})
l.logger.Info("end runLightning")
return err
}

Expand All @@ -155,13 +198,19 @@ func (l *LightningLoader) restore(ctx context.Context) error {
return err
}
if !l.checkPoint.IsTableFinished(lightningCheckpointDB, lightningCheckpointTable) {
if l.checkPointList != nil {
if err = l.checkPointList.RegisterCheckPoint(ctx, l.workerName, l.cfg.Name); err != nil {
return err
}
}
cfg := lcfg.NewConfig()
if err = cfg.LoadFromGlobal(l.lightningConfig); err != nil {
return err
}
cfg.Routes = l.cfg.RouteRules
cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL
cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.ColumnName(l.workerName)
cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.TableName(l.workerName, l.cfg.Name)
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin
param := common.MySQLConnectParam{
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
Expand All @@ -172,28 +221,36 @@ func (l *LightningLoader) restore(ctx context.Context) error {
TLS: cfg.TiDB.TLS,
}
cfg.Checkpoint.DSN = param.ToDSN()
cfg.TiDB.Vars = make(map[string]string)
if l.cfg.To.Session != nil {
for k, v := range l.cfg.To.Session {
cfg.TiDB.Vars[k] = v
}
}

cfg.TiDB.StrSQLMode = l.cfg.LoaderConfig.SQLMode
cfg.TiDB.Vars = map[string]string{
"time_zone": l.timeZone,
}
if err = cfg.Adjust(ctx); err != nil {
return err
}
l.Lock()
taskCtx, cancel := context.WithCancel(ctx)
l.cancel = cancel
l.Unlock()
err = l.core.RunOnce(taskCtx, cfg, nil)
err = l.runLightning(ctx, cfg)
if err == nil {
err = lightning.CheckpointRemove(ctx, cfg, "all")
}
if err == nil {
l.finish.Store(true)
offsetSQL := l.checkPoint.GenSQL(lightningCheckpointFile, 1)
err = l.toDBConns[0].executeSQL(tctx, []string{offsetSQL})
_ = l.checkPoint.UpdateOffset(lightningCheckpointFile, 1)
} else {
l.logger.Error("failed to runlightning", zap.Error(err))
}
} else {
l.finish.Store(true)
}
if l.cfg.Mode == config.ModeFull {
if err == nil && l.finish.Load() && l.cfg.Mode == config.ModeFull {
if err = delLoadTask(l.cli, l.cfg, l.workerName); err != nil {
return err
}
Expand Down Expand Up @@ -233,6 +290,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)
}

if err := l.restore(ctx); err != nil && !utils.IsContextCanceledError(err) {
l.logger.Error("process error", zap.Error(err))
errs = append(errs, unit.NewProcessError(err))
}
isCanceled := false
Expand All @@ -241,7 +299,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)
isCanceled = true
default:
}
l.logger.Info("lightning load end")
l.logger.Info("lightning load end", zap.Bool("IsCanceled", isCanceled))
pr <- pb.ProcessResult{
IsCanceled: isCanceled,
Errors: errs,
Expand All @@ -261,6 +319,8 @@ func (l *LightningLoader) IsFreshTask(ctx context.Context) (bool, error) {
// Close does graceful shutdown.
func (l *LightningLoader) Close() {
l.Pause()
l.checkPoint.Close()
l.checkPointList.Close()
l.closed.Store(true)
}

Expand Down
5 changes: 3 additions & 2 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (l *Loader) Init(ctx context.Context) (err error) {

l.logger.Info("loader's sql_mode is", zap.String("sqlmode", lcfg.To.Session["sql_mode"]))

l.toDB, l.toDBConns, err = createConns(tctx, lcfg, l.cfg.PoolSize)
l.toDB, l.toDBConns, err = createConns(tctx, lcfg, lcfg.Name, lcfg.SourceID, l.cfg.PoolSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -1328,7 +1328,8 @@ func (q *jobQueue) startConsumers(handler func(ctx context.Context, job *restore
}
}(baseConn)
session = &DBConn{
cfg: job.loader.cfg,
name: job.loader.cfg.Name,
sourceID: job.loader.cfg.SourceID,
baseConn: baseConn,
resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData")
Expand Down
Loading

0 comments on commit eb64839

Please sign in to comment.