Skip to content

Commit

Permalink
drop checkpoint database
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Nov 4, 2021
1 parent 54b0e52 commit d5cad3f
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 15 deletions.
15 changes: 15 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sync"
"time"

"github.com/pingcap/ticdc/dm/loader"

"github.com/labstack/echo/v4"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -1422,6 +1424,19 @@ func withHost(addr string) string {

func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string, toDBCfg *config.DBConfig) error {
toDBCfg.Adjust()

if toDBCfg != nil {
tctx := tcontext.NewContext(ctx, log.L())
list, err := loader.NewLightningCheckpointList(tctx, *toDBCfg, metaSchema)
if err != nil {
return err
}
err = list.RemoveTaskCheckPoint(tctx, taskName)
if err != nil {
return err
}
list.Close()
}
// clear shard meta data for pessimistic/optimist
err := s.pessimist.RemoveMetaData(taskName)
if err != nil {
Expand Down
100 changes: 99 additions & 1 deletion dm/loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
"go.uber.org/zap"
)

const (
lightningCheckpointListName = "lightning_checkpoint_list"
)

// CheckPoint represents checkpoint status.
type CheckPoint interface {
// Load loads all checkpoints recorded before.
Expand Down Expand Up @@ -100,7 +104,7 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
db, dbConns, err := createConns(tctx, cfg, 1)
db, dbConns, err := createConns(tctx, cfg.To, cfg.Name, cfg.SourceID, 1)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -464,3 +468,97 @@ func (cp *RemoteCheckPoint) String() string {
}
return string(bytes)
}

type LightningCheckpointList struct {
db *conn.BaseDB
connMutex sync.Mutex
conn *DBConn
schema string
tableName string
logger log.Logger
}

func NewLightningCheckpointList(tctx *tcontext.Context, cfg config.DBConfig, metaSchema string) (*LightningCheckpointList, error) {
db, dbConns, err := createConns(tctx, cfg, "checkpoint", "checkpoint", 1)
if err != nil {
return nil, err
}
cp := &LightningCheckpointList{
db: db,
conn: dbConns[0],
schema: dbutil.ColumnName(metaSchema),
tableName: dbutil.TableName(metaSchema, lightningCheckpointListName),
logger: tctx.L().WithFields(zap.String("component", "lightning checkpoint database list")),
}
err = cp.prepare(tctx)
if err != nil {
return nil, err
}
return cp, nil
}

func (cp *LightningCheckpointList) prepare(tctx *tcontext.Context) error {
createTable := `CREATE TABLE IF NOT EXISTS %s (
worker_name varchar(255) NOT NULL,
task_name varchar(255) NOT NULL,
PRIMARY KEY (task_name, worker_name)
);
`
sql2 := fmt.Sprintf(createTable, cp.tableName)
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

func (cp *LightningCheckpointList) RegisterCheckPoint(tctx *tcontext.Context, workerName, taskName string) error {
sql := fmt.Sprintf("INSERT INGORE INTO %s (`id`, `worker_name`, `task_name`) VALUES(?,?)", cp.tableName)
cp.logger.Info("initial checkpoint record",
zap.String("sql", sql),
zap.String("worker-name", workerName),
zap.String("task-name", taskName))
args := []interface{}{workerName, taskName}
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql}, args)
cp.connMutex.Unlock()
if err != nil {
return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream)
}
return nil
}

func (cp *LightningCheckpointList) RemoveTaskCheckPoint(tctx *tcontext.Context, taskName string) error {
query := fmt.Sprintf("SELECT `worker_name` from %s where `task_name`=?", cp.tableName)
cp.connMutex.Lock()
rows, err := cp.conn.querySQL(tctx, query, taskName)
cp.connMutex.Unlock()
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
defer rows.Close()
var workerName string
for rows.Next() {
err := rows.Scan(&workerName)
if err != nil {
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream)
}
cpdb := config.TiDBLightningCheckpointPrefix + dbutil.TableName(workerName, taskName)
sql := fmt.Sprintf("DROP DATABASE IF NOT EXISTS %s", cpdb)
err = cp.conn.executeSQL(tctx, []string{sql})
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
}
query = fmt.Sprintf("DELETE from %s where `task_name`=?", cp.tableName)
cp.connMutex.Lock()
err = cp.conn.executeSQL(tctx, []string{query}, []interface{}{taskName})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

// Close implements CheckPoint.Close.
func (cp *LightningCheckpointList) Close() {
if err := cp.db.Close(); err != nil {
cp.logger.Error("close checkpoint list db", log.ShortError(err))
}
}
17 changes: 10 additions & 7 deletions dm/loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import (
// DBConn represents a live DB connection
// it's not thread-safe.
type DBConn struct {
cfg *config.SubTaskConfig
name string
sourceID string
baseConn *conn.BaseConn

// generate new BaseConn and close old one
Expand Down Expand Up @@ -89,7 +90,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf
cost := time.Since(startTime)
// duration seconds
ds := cost.Seconds()
queryHistogram.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Observe(ds)
queryHistogram.WithLabelValues(conn.name, conn.sourceID).Observe(ds)
if ds > 1 {
ctx.L().Warn("query statement too slow",
zap.Duration("cost time", cost),
Expand Down Expand Up @@ -123,7 +124,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
FirstRetryDuration: 2 * time.Second,
BackoffStrategy: retry.LinearIncrease,
IsRetryableFn: func(retryTime int, err error) bool {
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Inc()
tidbExecutionErrorCounter.WithLabelValues(conn.name, conn.sourceID).Inc()
if retry.IsConnectionError(err) {
err = conn.resetConn(ctx)
if err != nil {
Expand Down Expand Up @@ -151,7 +152,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ...
params,
func(ctx *tcontext.Context) (interface{}, error) {
startTime := time.Now()
_, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.cfg.Name, queries, args...)
_, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.name, queries, args...)
failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) {
errCode, err1 := strconv.ParseUint(val.(string), 10, 16)
if err1 != nil {
Expand Down Expand Up @@ -196,8 +197,10 @@ func (conn *DBConn) resetConn(tctx *tcontext.Context) error {
return nil
}

func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*DBConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(cfg.To)
func createConns(tctx *tcontext.Context, cfg config.DBConfig,
name, sourceID string,
workerCount int) (*conn.BaseDB, []*DBConn, error) {
baseDB, err := conn.DefaultDBProvider.Apply(cfg)
if err != nil {
return nil, nil, terror.WithScope(err, terror.ScopeDownstream)
}
Expand All @@ -218,7 +221,7 @@ func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount
}
return baseDB.GetBaseConn(tctx.Context())
}
conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn})
conns = append(conns, &DBConn{baseConn: baseConn, name: name, sourceID: sourceID, resetBaseConnFn: resetBaseConnFn})
}
return baseDB, conns, nil
}
Expand Down
24 changes: 19 additions & 5 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type LightningLoader struct {
cfg *config.SubTaskConfig
cli *clientv3.Client
checkPoint CheckPoint
checkPointList *LightningCheckpointList
workerName string
logger log.Logger
core *lightning.Lightning
Expand Down Expand Up @@ -114,29 +115,34 @@ func (l *LightningLoader) Type() pb.UnitType {
func (l *LightningLoader) Init(ctx context.Context) (err error) {
tctx := tcontext.NewContext(ctx, l.logger)
checkpoint, err := newRemoteCheckPoint(tctx, l.cfg, l.checkpointID())
if err == nil {
l.checkPoint = checkpoint
checkpointList, err1 := NewLightningCheckpointList(tctx, l.cfg.To, l.cfg.MetaSchema)
err = err1
l.checkPointList = checkpointList
}
failpoint.Inject("ignoreLoadCheckpointErr", func(_ failpoint.Value) {
l.logger.Info("", zap.String("failpoint", "ignoreLoadCheckpointErr"))
err = nil
})
l.checkPoint = checkpoint
if err != nil {
return err
}
toCfg, err := l.cfg.Clone()
if err != nil {
return err
}
config.AdjustTargetDBTimeZone(&toCfg.To)
l.toDB, l.toDBConns, err = createConns(tctx, toCfg, 1)
l.toDB, l.toDBConns, err = createConns(tctx, toCfg.To, toCfg.Name, toCfg.SourceID, 1)
return err
}

func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error {
l.logger.Info("start runLightning")
l.Lock()
taskCtx, cancel := context.WithCancel(ctx)
l.cancel = cancel
l.Unlock()
l.logger.Info("start RunOnce")
err := l.core.RunOnce(taskCtx, cfg, nil)
l.logger.Info("end RunOnce")
failpoint.Inject("LightningLoadDataSlowDown", nil)
failpoint.Inject("LightningLoadDataSlowDownByTask", func(val failpoint.Value) {
tasks := val.(string)
Expand Down Expand Up @@ -174,6 +180,12 @@ func (l *LightningLoader) restore(ctx context.Context) error {
return err
}
if !l.checkPoint.IsTableFinished(lightningCheckpointDB, lightningCheckpointTable) {
if l.checkPointList != nil {
tctx := tcontext.NewContext(ctx, l.logger)
if err = l.checkPointList.RegisterCheckPoint(tctx, l.workerName, l.cfg.Name); err != nil {
return err
}
}
cfg := lcfg.NewConfig()
if err = cfg.LoadFromGlobal(l.lightningConfig); err != nil {
return err
Expand Down Expand Up @@ -281,6 +293,8 @@ func (l *LightningLoader) IsFreshTask(ctx context.Context) (bool, error) {
// Close does graceful shutdown.
func (l *LightningLoader) Close() {
l.Pause()
l.checkPoint.Close()
l.checkPointList.Close()
l.closed.Store(true)
}

Expand Down
5 changes: 3 additions & 2 deletions dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (l *Loader) Init(ctx context.Context) (err error) {

l.logger.Info("loader's sql_mode is", zap.String("sqlmode", lcfg.To.Session["sql_mode"]))

l.toDB, l.toDBConns, err = createConns(tctx, lcfg, l.cfg.PoolSize)
l.toDB, l.toDBConns, err = createConns(tctx, lcfg.To, lcfg.Name, lcfg.SourceID, l.cfg.PoolSize)
if err != nil {
return err
}
Expand Down Expand Up @@ -1320,7 +1320,8 @@ func (q *jobQueue) startConsumers(handler func(ctx context.Context, job *restore
}
}(baseConn)
session = &DBConn{
cfg: job.loader.cfg,
name: job.loader.cfg.Name,
sourceID: job.loader.cfg.SourceID,
baseConn: baseConn,
resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) {
return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData")
Expand Down

0 comments on commit d5cad3f

Please sign in to comment.