Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM: add checkpoint test for lightning-loader #3220

Merged
merged 21 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/cmd/dm-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func main() {
common.PrintLinesf("init logger error %s", terror.Message(err))
os.Exit(2)
}
lightningLog.SetAppLogger(log.L().Logger)

utils.LogHTTPProxies(true)

Expand All @@ -65,6 +64,7 @@ func main() {
lg, r, _ := globalLog.InitLogger(conf)
lg = lg.With(zap.String("component", "ddl tracker"))
globalLog.ReplaceGlobals(lg, r)
lightningLog.SetAppLogger(log.L().Logger)
Little-Wallace marked this conversation as resolved.
Show resolved Hide resolved

utils.PrintInfo("dm-worker", func() {
log.L().Info("", zap.Stringer("dm-worker config", cfg))
Expand Down
84 changes: 65 additions & 19 deletions dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,29 @@ package loader
import (
"context"
"path/filepath"
"strings"
"sync"

"github.com/docker/go-units"
"github.com/pingcap/ticdc/dm/dm/config"
"github.com/pingcap/ticdc/dm/dm/pb"
"github.com/pingcap/ticdc/dm/dm/unit"
"github.com/pingcap/ticdc/dm/pkg/binlog"
"github.com/pingcap/ticdc/dm/pkg/conn"
tcontext "github.com/pingcap/ticdc/dm/pkg/context"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/utils"

Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb/br/pkg/lightning"
"github.com/pingcap/tidb/br/pkg/lightning/common"
lcfg "github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/parser/mysql"

"github.com/docker/go-units"
"go.etcd.io/etcd/clientv3"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pingcap/ticdc/dm/dm/config"
"github.com/pingcap/ticdc/dm/dm/pb"
"github.com/pingcap/ticdc/dm/dm/unit"
"github.com/pingcap/ticdc/dm/pkg/binlog"
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
"github.com/pingcap/ticdc/dm/pkg/conn"
tcontext "github.com/pingcap/ticdc/dm/pkg/context"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/utils"
)

const (
Expand Down Expand Up @@ -118,7 +120,39 @@ func (l *LightningLoader) Init(ctx context.Context) (err error) {
err = nil
})
l.checkPoint = checkpoint
l.toDB, l.toDBConns, err = createConns(tctx, l.cfg, 1)
toCfg, err := l.cfg.Clone()
if err != nil {
return err
}
if toCfg.To.Session == nil {
toCfg.To.Session = make(map[string]string)
}
toCfg.To.Session["time_zone"] = "+00:00"
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
l.toDB, l.toDBConns, err = createConns(tctx, toCfg, 1)
return err
}

func (l *LightningLoader) runLightning(ctx context.Context, cfg *lcfg.Config) error {
l.logger.Info("start runLightning")
l.Lock()
taskCtx, cancel := context.WithCancel(ctx)
l.cancel = cancel
l.Unlock()
l.logger.Info("start RunOnce")
err := l.core.RunOnce(taskCtx, cfg, nil)
l.logger.Info("end RunOnce")
failpoint.Inject("LightningLoadDataSlowDown", nil)
failpoint.Inject("LightningLoadDataSlowDownByTask", func(val failpoint.Value) {
tasks := val.(string)
taskNames := strings.Split(tasks, ",")
for _, taskName := range taskNames {
if l.cfg.Name == taskName {
l.logger.Info("inject failpoint LightningLoadDataSlowDownByTask", zap.String("task", taskName))
<-taskCtx.Done()
}
}
})
l.logger.Info("end runLightning")
return err
}

Expand Down Expand Up @@ -150,7 +184,9 @@ func (l *LightningLoader) restore(ctx context.Context) error {
}
cfg.Routes = l.cfg.RouteRules
cfg.Checkpoint.Driver = lcfg.CheckpointDriverMySQL
cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.ColumnName(l.workerName)
cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.TableName(l.workerName, l.cfg.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why we need dbutil.TableName, because in NewMySQLCheckpointsDB of lightning it will do escaping for backquotes 🤔

can we only use config.TiDBLightningCheckpointPrefix + l.workerName + l.cfg.Name as database name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that can not pass test

// cfg.Checkpoint.Schema = config.TiDBLightningCheckpointPrefix + dbutil.ColumnName(l.workerName)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
cfg.Checkpoint.KeepAfterSuccess = lcfg.CheckpointOrigin
param := common.MySQLConnectParam{
Host: cfg.TiDB.Host,
Port: cfg.TiDB.Port,
Expand All @@ -161,25 +197,34 @@ func (l *LightningLoader) restore(ctx context.Context) error {
TLS: cfg.TiDB.TLS,
}
cfg.Checkpoint.DSN = param.ToDSN()
cfg.TiDB.Vars = make(map[string]string)
if l.cfg.To.Session != nil {
for k, v := range l.cfg.To.Session {
cfg.TiDB.Vars[k] = v
}
}
cfg.TiDB.Vars["time_zone"] = "+00:00"
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

cfg.TiDB.StrSQLMode = l.cfg.LoaderConfig.SQLMode
if err = cfg.Adjust(ctx); err != nil {
return err
}
l.Lock()
taskCtx, cancel := context.WithCancel(ctx)
l.cancel = cancel
l.Unlock()
err = l.core.RunOnce(taskCtx, cfg, nil)
err = l.runLightning(ctx, cfg)
if err == nil {
err = lightning.CheckpointRemove(ctx, cfg, "all")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can keep the checkpoint in target db and clear it when user start-task with flag remove-meta as syncer do

Copy link
Contributor

@glorv glorv Nov 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. In the current dm implementation, we should delay clean checkpoint data when stop task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we at least should make sure IsFreshTask returning false after a successful loading, so it needs we don't remove lightning checkpoint or let IsFreshTask inspecting LightningCheckpointList

}
if err == nil {
l.finish.Store(true)
offsetSQL := l.checkPoint.GenSQL(lightningCheckpointFile, 1)
err = l.toDBConns[0].executeSQL(tctx, []string{offsetSQL})
_ = l.checkPoint.UpdateOffset(lightningCheckpointFile, 1)
} else {
l.logger.Error("failed to runlightning", zap.Error(err))
}
} else {
l.finish.Store(true)
}
if l.cfg.Mode == config.ModeFull {
if err == nil && l.finish.Load() && l.cfg.Mode == config.ModeFull {
if err = delLoadTask(l.cli, l.cfg, l.workerName); err != nil {
return err
}
Expand Down Expand Up @@ -212,6 +257,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)
}

if err := l.restore(ctx); err != nil && !utils.IsContextCanceledError(err) {
l.logger.Error("process error", zap.Error(err))
errs = append(errs, unit.NewProcessError(err))
}
isCanceled := false
Expand All @@ -220,7 +266,7 @@ func (l *LightningLoader) Process(ctx context.Context, pr chan pb.ProcessResult)
isCanceled = true
default:
}
l.logger.Info("lightning load end")
l.logger.Info("lightning load end", zap.Bool("IsCanceled", isCanceled))
pr <- pb.ProcessResult{
IsCanceled: isCanceled,
Errors: errs,
Expand Down
51 changes: 51 additions & 0 deletions dm/tests/lightning_load_task/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
name: load_task1
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

tidb:
backend: "tidb"

block-allow-list:
instance:
do-dbs: ["load_task1"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
51 changes: 51 additions & 0 deletions dm/tests/lightning_load_task/conf/dm-task2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
name: load_task2
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

tidb:
backend: "tidb"

block-allow-list:
instance:
do-dbs: ["load_task2"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
45 changes: 45 additions & 0 deletions dm/tests/lightning_load_task/conf/dm-task3.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
name: load_task3
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["load_task3"]

tidb:
backend: "tidb"

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
45 changes: 45 additions & 0 deletions dm/tests/lightning_load_task/conf/dm-task4.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
name: load_task4
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["load_task4"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

tidb:
backend: "tidb"

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
Loading