diff --git a/dm/cmd/dm-worker/main.go b/dm/cmd/dm-worker/main.go index 410f14553bf..ce567ba97e3 100644 --- a/dm/cmd/dm-worker/main.go +++ b/dm/cmd/dm-worker/main.go @@ -55,7 +55,6 @@ func main() { common.PrintLinesf("init logger error %s", terror.Message(err)) os.Exit(2) } - lightningLog.SetAppLogger(log.L().Logger) utils.LogHTTPProxies(true) @@ -65,6 +64,8 @@ func main() { lg, r, _ := globalLog.InitLogger(conf) lg = lg.With(zap.String("component", "ddl tracker")) globalLog.ReplaceGlobals(lg, r) + lightningLogger := lg.With(zap.String("component", "lightning")) + lightningLog.SetAppLogger(lightningLogger) utils.PrintInfo("dm-worker", func() { log.L().Info("", zap.Stringer("dm-worker config", cfg)) diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index dd1c9e569fa..4452cc90855 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -1418,6 +1418,7 @@ func withHost(addr string) string { func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string, toDBCfg *config.DBConfig) error { toDBCfg.Adjust() + // clear shard meta data for pessimistic/optimist err := s.pessimist.RemoveMetaData(taskName) if err != nil { diff --git a/dm/loader/checkpoint.go b/dm/loader/checkpoint.go index c3f380ce1b7..c1387cbee72 100644 --- a/dm/loader/checkpoint.go +++ b/dm/loader/checkpoint.go @@ -14,6 +14,7 @@ package loader import ( + "context" "encoding/json" "fmt" "strings" @@ -31,6 +32,10 @@ import ( "go.uber.org/zap" ) +const ( + LightningCheckpointListName = "lightning_checkpoint_list" +) + // CheckPoint represents checkpoint status. type CheckPoint interface { // Load loads all checkpoints recorded before. @@ -100,7 +105,7 @@ type RemoteCheckPoint struct { } func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) { - db, dbConns, err := createConns(tctx, cfg, 1) + db, dbConns, err := createConns(tctx, cfg, cfg.Name, cfg.SourceID, 1) if err != nil { return nil, err } @@ -464,3 +469,99 @@ func (cp *RemoteCheckPoint) String() string { } return string(bytes) } + +type LightningCheckpointList struct { + db *conn.BaseDB + schema string + tableName string + logger log.Logger +} + +func NewLightningCheckpointList(db *conn.BaseDB, metaSchema string) *LightningCheckpointList { + return &LightningCheckpointList{ + db: db, + schema: dbutil.ColumnName(metaSchema), + tableName: dbutil.TableName(metaSchema, LightningCheckpointListName), + logger: log.L().WithFields(zap.String("component", "lightning checkpoint database list")), + } +} + +func (cp *LightningCheckpointList) Prepare(ctx context.Context) error { + connection, err := cp.db.GetBaseConn(ctx) + if err != nil { + return terror.WithScope(terror.Annotate(err, "initialize connection when prepare"), terror.ScopeDownstream) + } + createSchema := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", cp.schema) + tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) + _, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{createSchema}) + if err != nil { + return err + } + createTable := `CREATE TABLE IF NOT EXISTS %s ( + worker_name varchar(255) NOT NULL, + task_name varchar(255) NOT NULL, + PRIMARY KEY (task_name, worker_name) + ); +` + sql2 := fmt.Sprintf(createTable, cp.tableName) + _, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql2}) + return terror.WithScope(err, terror.ScopeDownstream) +} + +func (cp *LightningCheckpointList) RegisterCheckPoint(ctx context.Context, workerName, taskName string) error { + connection, err := cp.db.GetBaseConn(ctx) + if err != nil { + return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream) + } + + sql := fmt.Sprintf("INSERT IGNORE INTO %s (`worker_name`, `task_name`) VALUES(?,?)", cp.tableName) + cp.logger.Info("initial checkpoint record", + zap.String("sql", sql), + zap.String("worker-name", workerName), + zap.String("task-name", taskName)) + args := []interface{}{workerName, taskName} + tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) + _, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}, args) + if err != nil { + return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream) + } + return nil +} + +func (cp *LightningCheckpointList) RemoveTaskCheckPoint(ctx context.Context, taskName string) error { + connection, err := cp.db.GetBaseConn(ctx) + if err != nil { + return terror.WithScope(terror.Annotate(err, "initialize connection"), terror.ScopeDownstream) + } + + tctx := tcontext.NewContext(ctx, log.With(zap.String("job", "lightning-checkpoint"))) + query := fmt.Sprintf("SELECT `worker_name` from %s where `task_name`=?", cp.tableName) + rows, err := connection.QuerySQL(tctx, query, taskName) + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } + defer rows.Close() + var workerName string + for rows.Next() { + err = rows.Scan(&workerName) + if err != nil { + return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) + } + cpdb := config.TiDBLightningCheckpointPrefix + dbutil.TableName(workerName, taskName) + sql := fmt.Sprintf("DROP DATABASE IF NOT EXISTS %s", cpdb) + _, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{sql}) + if err != nil { + return terror.WithScope(err, terror.ScopeDownstream) + } + } + query = fmt.Sprintf("DELETE from %s where `task_name`=?", cp.tableName) + _, err = connection.ExecuteSQL(tctx, nil, "lightning-checkpoint", []string{query}, []interface{}{taskName}) + return terror.WithScope(err, terror.ScopeDownstream) +} + +// Close implements CheckPoint.Close. +func (cp *LightningCheckpointList) Close() { + if err := cp.db.Close(); err != nil { + cp.logger.Error("close checkpoint list db", log.ShortError(err)) + } +} diff --git a/dm/loader/db.go b/dm/loader/db.go index d588c6a3626..a9a919aa8a9 100644 --- a/dm/loader/db.go +++ b/dm/loader/db.go @@ -37,7 +37,8 @@ import ( // DBConn represents a live DB connection // it's not thread-safe. type DBConn struct { - cfg *config.SubTaskConfig + name string + sourceID string baseConn *conn.BaseConn // generate new BaseConn and close old one @@ -89,7 +90,7 @@ func (conn *DBConn) querySQL(ctx *tcontext.Context, query string, args ...interf cost := time.Since(startTime) // duration seconds ds := cost.Seconds() - queryHistogram.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Observe(ds) + queryHistogram.WithLabelValues(conn.name, conn.sourceID).Observe(ds) if ds > 1 { ctx.L().Warn("query statement too slow", zap.Duration("cost time", cost), @@ -123,7 +124,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... FirstRetryDuration: 2 * time.Second, BackoffStrategy: retry.LinearIncrease, IsRetryableFn: func(retryTime int, err error) bool { - tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name, conn.cfg.SourceID).Inc() + tidbExecutionErrorCounter.WithLabelValues(conn.name, conn.sourceID).Inc() if retry.IsConnectionError(err) { err = conn.resetConn(ctx) if err != nil { @@ -151,7 +152,7 @@ func (conn *DBConn) executeSQL(ctx *tcontext.Context, queries []string, args ... params, func(ctx *tcontext.Context) (interface{}, error) { startTime := time.Now() - _, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.cfg.Name, queries, args...) + _, err := conn.baseConn.ExecuteSQL(ctx, stmtHistogram, conn.name, queries, args...) failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) { errCode, err1 := strconv.ParseUint(val.(string), 10, 16) if err1 != nil { @@ -196,7 +197,9 @@ func (conn *DBConn) resetConn(tctx *tcontext.Context) error { return nil } -func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount int) (*conn.BaseDB, []*DBConn, error) { +func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, + name, sourceID string, + workerCount int) (*conn.BaseDB, []*DBConn, error) { baseDB, err := conn.DefaultDBProvider.Apply(&cfg.To) if err != nil { return nil, nil, terror.WithScope(err, terror.ScopeDownstream) @@ -218,7 +221,7 @@ func createConns(tctx *tcontext.Context, cfg *config.SubTaskConfig, workerCount } return baseDB.GetBaseConn(tctx.Context()) } - conns = append(conns, &DBConn{baseConn: baseConn, cfg: cfg, resetBaseConnFn: resetBaseConnFn}) + conns = append(conns, &DBConn{baseConn: baseConn, name: name, sourceID: sourceID, resetBaseConnFn: resetBaseConnFn}) } return baseDB, conns, nil } diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 8ef21d367a6..8f6265ce102 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -16,6 +16,7 @@ package loader import ( "context" "path/filepath" + "strings" "sync" "github.com/docker/go-units" @@ -53,6 +54,7 @@ type LightningLoader struct { cfg *config.SubTaskConfig cli *clientv3.Client checkPoint CheckPoint + checkPointList *LightningCheckpointList workerName string logger log.Logger core *lightning.Lightning @@ -114,13 +116,33 @@ func (l *LightningLoader) Type() pb.UnitType { // if fail, it should not call l.Close. func (l *LightningLoader) Init(ctx context.Context) (err error) { tctx := tcontext.NewContext(ctx, l.logger) + toCfg, err := l.cfg.Clone() + if err != nil { + return err + } + l.toDB, l.toDBConns, err = createConns(tctx, l.cfg, toCfg.Name, toCfg.SourceID, 1) + if err != nil { + return err + } + checkpoint, err := newRemoteCheckPoint(tctx, l.cfg, l.checkpointID()) + if err == nil { + l.checkPoint = checkpoint + checkpointList := NewLightningCheckpointList(l.toDB, l.cfg.MetaSchema) + err1 := checkpointList.Prepare(ctx) + if err1 == nil { + l.checkPointList = checkpointList + } + err = err1 + } failpoint.Inject("ignoreLoadCheckpointErr", func(_ failpoint.Value) { l.logger.Info("", zap.String("failpoint", "ignoreLoadCheckpointErr")) err = nil }) - l.checkPoint = checkpoint - l.toDB, l.toDBConns, err = createConns(tctx, l.cfg, 1) + if err != nil { + return err + } + timeZone := l.cfg.Timezone if len(timeZone) == 0 { var err1 error @@ -130,6 +152,27 @@ func (l *LightningLoader) Init(ctx context.Context) (err error) { } } l.timeZone = timeZone + return nil +} + +func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error { + l.Lock() + taskCtx, cancel := context.WithCancel(ctx) + l.cancel = cancel + l.Unlock() + err := l.core.RunOnce(taskCtx, cfg, nil) + failpoint.Inject("LightningLoadDataSlowDown", nil) + failpoint.Inject("LightningLoadDataSlowDownByTask", func(val failpoint.Value) { + tasks := val.(string) + taskNames := strings.Split(tasks, ",") + for _, taskName := range taskNames { + if l.cfg.Name == taskName { + l.logger.Info("inject failpoint LightningLoadDataSlowDownByTask", zap.String("task", taskName)) + <-taskCtx.Done() + } + } + }) + l.logger.Info("end runLightning") return err } @@ -155,13 +198,19 @@ func (l *LightningLoader) restore(ctx context.Context) error { return err } if !l.checkPoint.IsTableFinished(lightningCheckpointDB, lightningCheckpointTable) { + if l.checkPointList != nil { + if err = l.checkPointList.RegisterCheckPoint(ctx, l.workerName, l.cfg.Name); err != nil { + return err + } + } cfg := lcfg.NewConfig() if err = cfg.LoadFromGlobal(l.lightningConfig); err != nil { return err } cfg.Routes = l.cfg.RouteRules cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL - cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.ColumnName(l.workerName) + cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.TableName(l.workerName, l.cfg.Name) + cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin param := common.MySQLConnectParam{ Host: cfg.TiDB.Host, Port: cfg.TiDB.Port, @@ -172,6 +221,13 @@ func (l *LightningLoader) restore(ctx context.Context) error { TLS: cfg.TiDB.TLS, } cfg.Checkpoint.DSN = param.ToDSN() + cfg.TiDB.Vars = make(map[string]string) + if l.cfg.To.Session != nil { + for k, v := range l.cfg.To.Session { + cfg.TiDB.Vars[k] = v + } + } + cfg.TiDB.StrSQLMode = l.cfg.LoaderConfig.SQLMode cfg.TiDB.Vars = map[string]string{ "time_zone": l.timeZone, @@ -179,21 +235,22 @@ func (l *LightningLoader) restore(ctx context.Context) error { if err = cfg.Adjust(ctx); err != nil { return err } - l.Lock() - taskCtx, cancel := context.WithCancel(ctx) - l.cancel = cancel - l.Unlock() - err = l.core.RunOnce(taskCtx, cfg, nil) + err = l.runLightning(ctx, cfg) + if err == nil { + err = lightning.CheckpointRemove(ctx, cfg, "all") + } if err == nil { l.finish.Store(true) offsetSQL := l.checkPoint.GenSQL(lightningCheckpointFile, 1) err = l.toDBConns[0].executeSQL(tctx, []string{offsetSQL}) _ = l.checkPoint.UpdateOffset(lightningCheckpointFile, 1) + } else { + l.logger.Error("failed to runlightning", zap.Error(err)) } } else { l.finish.Store(true) } - if l.cfg.Mode == config.ModeFull { + if err == nil && l.finish.Load() && l.cfg.Mode == config.ModeFull { if err = delLoadTask(l.cli, l.cfg, l.workerName); err != nil { return err } @@ -233,6 +290,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult) } if err := l.restore(ctx); err != nil && !utils.IsContextCanceledError(err) { + l.logger.Error("process error", zap.Error(err)) errs = append(errs, unit.NewProcessError(err)) } isCanceled := false @@ -241,7 +299,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult) isCanceled = true default: } - l.logger.Info("lightning load end") + l.logger.Info("lightning load end", zap.Bool("IsCanceled", isCanceled)) pr <- pb.ProcessResult{ IsCanceled: isCanceled, Errors: errs, @@ -261,6 +319,8 @@ func (l *LightningLoader) IsFreshTask(ctx context.Context) (bool, error) { // Close does graceful shutdown. func (l *LightningLoader) Close() { l.Pause() + l.checkPoint.Close() + l.checkPointList.Close() l.closed.Store(true) } diff --git a/dm/loader/loader.go b/dm/loader/loader.go index 2b80fba6ac7..38fd086dd64 100644 --- a/dm/loader/loader.go +++ b/dm/loader/loader.go @@ -562,7 +562,7 @@ func (l *Loader) Init(ctx context.Context) (err error) { l.logger.Info("loader's sql_mode is", zap.String("sqlmode", lcfg.To.Session["sql_mode"])) - l.toDB, l.toDBConns, err = createConns(tctx, lcfg, l.cfg.PoolSize) + l.toDB, l.toDBConns, err = createConns(tctx, lcfg, lcfg.Name, lcfg.SourceID, l.cfg.PoolSize) if err != nil { return err } @@ -1328,7 +1328,8 @@ func (q *jobQueue) startConsumers(handler func(ctx context.Context, job *restore } }(baseConn) session = &DBConn{ - cfg: job.loader.cfg, + name: job.loader.cfg.Name, + sourceID: job.loader.cfg.SourceID, baseConn: baseConn, resetBaseConnFn: func(*tcontext.Context, *conn.BaseConn) (*conn.BaseConn, error) { return nil, terror.ErrDBBadConn.Generate("bad connection error restoreData") diff --git a/dm/tests/lightning_load_task/conf/dm-task.yaml b/dm/tests/lightning_load_task/conf/dm-task.yaml new file mode 100644 index 00000000000..b29aa864d12 --- /dev/null +++ b/dm/tests/lightning_load_task/conf/dm-task.yaml @@ -0,0 +1,50 @@ +--- +name: load_task1 +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +tidb: + backend: "tidb" + +block-allow-list: + instance: + do-dbs: ["load_task1"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/lightning_load_task/conf/dm-task2.yaml b/dm/tests/lightning_load_task/conf/dm-task2.yaml new file mode 100644 index 00000000000..a751da96d85 --- /dev/null +++ b/dm/tests/lightning_load_task/conf/dm-task2.yaml @@ -0,0 +1,50 @@ +--- +name: load_task2 +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +tidb: + backend: "tidb" + +block-allow-list: + instance: + do-dbs: ["load_task2"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/lightning_load_task/conf/dm-task3.yaml b/dm/tests/lightning_load_task/conf/dm-task3.yaml new file mode 100644 index 00000000000..e6395139e8e --- /dev/null +++ b/dm/tests/lightning_load_task/conf/dm-task3.yaml @@ -0,0 +1,44 @@ +--- +name: load_task3 +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-02" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["load_task3"] + +tidb: + backend: "tidb" + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/lightning_load_task/conf/dm-task4.yaml b/dm/tests/lightning_load_task/conf/dm-task4.yaml new file mode 100644 index 00000000000..d2abd5092f3 --- /dev/null +++ b/dm/tests/lightning_load_task/conf/dm-task4.yaml @@ -0,0 +1,44 @@ +--- +name: load_task4 +task-mode: all +is-sharding: false +meta-schema: "dm_meta" +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 + +target-database: + host: "127.0.0.1" + port: 4000 + user: "test" + password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + mydumper-config-name: "global" + loader-config-name: "global" + syncer-config-name: "global" + +block-allow-list: + instance: + do-dbs: ["load_task4"] + +mydumpers: + global: + threads: 4 + chunk-filesize: 64 + skip-tz-utc: true + extra-args: "" + +tidb: + backend: "tidb" + +loaders: + global: + pool-size: 16 + dir: "./dumped_data" + +syncers: + global: + worker-count: 16 + batch: 100 diff --git a/dm/tests/lightning_load_task/run.sh b/dm/tests/lightning_load_task/run.sh new file mode 100755 index 00000000000..f9eb0a57c47 --- /dev/null +++ b/dm/tests/lightning_load_task/run.sh @@ -0,0 +1,243 @@ +#!/bin/bash + +set -eu + +cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +DATA_DIR=$cur/../load_task/data +CONF_DIR=$cur/../load_task/conf +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +API_VERSION="v1alpha1" +WORKER1="worker1" +WORKER2="worker2" +WORKER3="worker3" + +function test_worker_restart() { + echo "test worker restart" + # worker1 offline + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + + # source1 bound to worker3 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker3" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-01\"" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker1" \ + "\"stage\": \"offline\"" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task1" \ + "different worker in load stage, previous worker: $WORKER1, current worker: $WORKER3" 1 \ + "Please check if the previous worker is online." 1 + + # worker1 online + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/loader/LightningLoadDataSlowDownByTask=return(\"load_task1\")" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $CONF_DIR/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # transfer to worker1 + check_log_contain_with_retry 'transfer source and worker.*worker1.*worker3.*mysql-replica-01' $WORK_DIR/master/log/dm-master.log + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker3" \ + "\"stage\": \"free\"" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker1" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-01\"" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task1" \ + "\"unit\": \"Load\"" 1 \ + "\"unit\": \"Sync\"" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task2" \ + "\"unit\": \"Load\"" 1 \ + "\"unit\": \"Sync\"" 1 +} + +# almost never happen since user hardly start a load task after another load task failed. +function test_transfer_two_sources() { + echo "test_transfer_two_sources" + # worker2 offline + ps aux | grep dm-worker2 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER2_PORT 20 + + # source2 bound to worker3 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker3" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-02\"" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task2" \ + "different worker in load stage, previous worker: $WORKER2, current worker: $WORKER3" 1 + + # start load task for worker3 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task3.yaml --remove-meta" \ + "\"result\": true" 2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task3" \ + "\"unit\": \"Load\"" 1 + + # worker2 online + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/loader/LightningLoadDataSlowDown=sleep(15000)" + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $CONF_DIR/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # worker2 free since (worker3, source2) has load task(load_task3) + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker2" \ + "\"stage\": \"free\"" 1 + + # worker1 offline + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + + # source1 bound to worker2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker2" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-01\"" 1 + + # start load_task4 on worker2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task4.yaml --remove-meta" \ + "\"result\": true" 2 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task4" \ + "\"unit\": \"Load\"" 1 + + # worker1 online + export GO_FAILPOINTS="" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $CONF_DIR/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # worker1 free since (worker2, source1) has load task(load_task4) + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker1" \ + "\"stage\": \"free\"" 1 + + # now, worker2 waiting worker3 finish load_task3, worker1 waiting worker2 finish load_task4 + # worker3 offline + ps aux | grep dm-worker3 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER3_PORT 20 + + # source2 bound to worker1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker1" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-02\"" 1 + + # (worker1, source2), (worker2, source1) + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task1" \ + "different worker in load stage, previous worker: $WORKER1, current worker: $WORKER2" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task2" \ + "different worker in load stage, previous worker: $WORKER2, current worker: $WORKER1" 1 + + # worker2 finish load_task4 + # master transfer (worker1, source2), (worker2, source1) to (worker1, source1), (worker2, source2) + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker1" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-01\"" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker2" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-02\"" 1 + + # task1, 2, 4 running, task3 fail + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status" \ + "\"taskStatus\": \"Running\"" 3 \ + "taskStatus.*Error" 1 + + # worker3 online + export GO_FAILPOINTS="" + run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $CONF_DIR/dm-worker3.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT + + # source2 bound to worker3 since load_task3 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "list-member -w -n worker2" \ + "\"stage\": \"bound\"" 1 \ + "\"source\": \"mysql-replica-02\"" 1 + + # all task running + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status" \ + "\"taskStatus\": \"Running\"" 4 +} + +function run() { + echo "import prepare data" + run_sql_file $DATA_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + check_contains 'Query OK, 2 rows affected' + run_sql_file $DATA_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_contains 'Query OK, 3 rows affected' + + echo "start DM master, workers and sources" + run_dm_master $WORK_DIR/master $MASTER_PORT1 $CONF_DIR/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1 + + # worker1 loading load_task1 + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/loader/LightningLoadDataSlowDownByTask=return(\"load_task1\")" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $CONF_DIR/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + cp $CONF_DIR/source1.yaml $WORK_DIR/source1.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + # worker2 loading load_task2 + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/loader/LightningLoadDataSlowDownByTask=return(\"load_task2\")" + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $CONF_DIR/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + cp $CONF_DIR/source2.yaml $WORK_DIR/source2.yaml + sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + # worker3 loading load_task3 + export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/loader/LightningLoadDataSlowDownByTask=return(\"load_task3\")" + run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $CONF_DIR/dm-worker3.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT + + echo "start DM task" + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" + dmctl_start_task "$cur/conf/dm-task2.yaml" "--remove-meta" + + check_log_contain_with_retry 'inject failpoint LightningLoadDataSlowDownByTask' $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry 'inject failpoint LightningLoadDataSlowDownByTask' $WORK_DIR/worker2/log/dm-worker.log + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task1" \ + "\"unit\": \"Load\"" 1 \ + "\"unit\": \"Sync\"" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status load_task2" \ + "\"unit\": \"Load\"" 1 \ + "\"unit\": \"Sync\"" 1 + + test_worker_restart + + test_transfer_two_sources + + run_sql_file $DATA_DIR/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $DATA_DIR/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + check_sync_diff $WORK_DIR $CONF_DIR/diff_config1.toml + check_sync_diff $WORK_DIR $CONF_DIR/diff_config2.toml + check_sync_diff $WORK_DIR $CONF_DIR/diff_config3.toml + check_sync_diff $WORK_DIR $CONF_DIR/diff_config4.toml +} + +cleanup_data load_task1 +cleanup_data load_task2 +cleanup_data load_task3 +cleanup_data load_task4 +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/dm/tests/load_task/conf/dm-task.yaml b/dm/tests/load_task/conf/dm-task.yaml index 62ff585d4a2..32cacf0379a 100644 --- a/dm/tests/load_task/conf/dm-task.yaml +++ b/dm/tests/load_task/conf/dm-task.yaml @@ -3,7 +3,6 @@ name: load_task1 task-mode: all is-sharding: false meta-schema: "dm_meta" -# enable-heartbeat: true heartbeat-update-interval: 1 heartbeat-report-interval: 1 diff --git a/dm/tests/load_task/conf/dm-task2.yaml b/dm/tests/load_task/conf/dm-task2.yaml index 6352d396494..14c8b07645d 100644 --- a/dm/tests/load_task/conf/dm-task2.yaml +++ b/dm/tests/load_task/conf/dm-task2.yaml @@ -3,7 +3,6 @@ name: load_task2 task-mode: all is-sharding: false meta-schema: "dm_meta" -# enable-heartbeat: true heartbeat-update-interval: 1 heartbeat-report-interval: 1 diff --git a/dm/tests/load_task/conf/dm-task3.yaml b/dm/tests/load_task/conf/dm-task3.yaml index 133d33b4a18..0a06a4eabfb 100644 --- a/dm/tests/load_task/conf/dm-task3.yaml +++ b/dm/tests/load_task/conf/dm-task3.yaml @@ -3,7 +3,6 @@ name: load_task3 task-mode: all is-sharding: false meta-schema: "dm_meta" -# enable-heartbeat: true heartbeat-update-interval: 1 heartbeat-report-interval: 1 diff --git a/dm/tests/load_task/conf/dm-task4.yaml b/dm/tests/load_task/conf/dm-task4.yaml index e112ea86b3c..7e153eb4c49 100644 --- a/dm/tests/load_task/conf/dm-task4.yaml +++ b/dm/tests/load_task/conf/dm-task4.yaml @@ -3,7 +3,6 @@ name: load_task4 task-mode: all is-sharding: false meta-schema: "dm_meta" -# enable-heartbeat: true heartbeat-update-interval: 1 heartbeat-report-interval: 1 diff --git a/dm/tests/tls/conf/dm-task.yaml b/dm/tests/tls/conf/dm-task.yaml index dd8d2dedc69..932ddac58ec 100644 --- a/dm/tests/tls/conf/dm-task.yaml +++ b/dm/tests/tls/conf/dm-task.yaml @@ -42,5 +42,5 @@ syncers: worker-count: 16 batch: 100 -tidb: - backend: "tidb" +#tidb: +# backend: "tidb"