Skip to content

Commit

Permalink
lightning: support keep checkpoint data after success without changin…
Browse files Browse the repository at this point in the history
…g schema name (#28472)
  • Loading branch information
glorv authored Sep 30, 2021
1 parent 01908fc commit 5f797c3
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 13 deletions.
78 changes: 73 additions & 5 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,74 @@ func (t PostOpLevel) String() string {
}
}

type CheckpointKeepStrategy int

const (
// remove checkpoint data
CheckpointRemove CheckpointKeepStrategy = iota
// keep by rename checkpoint file/db according to task id
CheckpointRename
// keep checkpoint data unchanged
CheckpointOrigin
)

func (t *CheckpointKeepStrategy) UnmarshalTOML(v interface{}) error {
switch val := v.(type) {
case bool:
if val {
*t = CheckpointRename
} else {
*t = CheckpointRemove
}
case string:
return t.FromStringValue(val)
default:
return errors.Errorf("invalid checkpoint keep strategy '%v', please choose valid option between ['remove', 'rename', 'origin']", v)
}
return nil
}

func (t CheckpointKeepStrategy) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}

// parser command line parameter
func (t *CheckpointKeepStrategy) FromStringValue(s string) error {
switch strings.ToLower(s) {
//nolint:goconst // This 'false' and other 'false's aren't the same.
case "remove", "false":
*t = CheckpointRemove
case "rename", "true":
*t = CheckpointRename
case "origin":
*t = CheckpointOrigin
default:
return errors.Errorf("invalid checkpoint keep strategy '%s', please choose valid option between ['remove', 'rename', 'origin']", s)
}
return nil
}

func (t *CheckpointKeepStrategy) MarshalJSON() ([]byte, error) {
return []byte(`"` + t.String() + `"`), nil
}

func (t *CheckpointKeepStrategy) UnmarshalJSON(data []byte) error {
return t.FromStringValue(strings.Trim(string(data), `"`))
}

func (t CheckpointKeepStrategy) String() string {
switch t {
case CheckpointRemove:
return "remove"
case CheckpointRename:
return "rename"
case CheckpointOrigin:
return "origin"
default:
panic(fmt.Sprintf("invalid post process type '%d'", t))
}
}

// MaxError configures the maximum number of acceptable errors per kind.
type MaxError struct {
// Syntax is the maximum number of syntax errors accepted.
Expand Down Expand Up @@ -396,11 +464,11 @@ type TikvImporter struct {
}

type Checkpoint struct {
Schema string `toml:"schema" json:"schema"`
DSN string `toml:"dsn" json:"-"` // DSN may contain password, don't expose this to JSON.
Driver string `toml:"driver" json:"driver"`
Enable bool `toml:"enable" json:"enable"`
KeepAfterSuccess bool `toml:"keep-after-success" json:"keep-after-success"`
Schema string `toml:"schema" json:"schema"`
DSN string `toml:"dsn" json:"-"` // DSN may contain password, don't expose this to JSON.
Driver string `toml:"driver" json:"driver"`
Enable bool `toml:"enable" json:"enable"`
KeepAfterSuccess CheckpointKeepStrategy `toml:"keep-after-success" json:"keep-after-success"`
}

type Cron struct {
Expand Down
46 changes: 46 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,3 +794,49 @@ func (s *configTestSuite) TestDataCharacterSet(c *C) {
}
}
}

func (s *configTestSuite) TestCheckpointKeepStrategy(c *C) {
tomlCases := map[interface{}]config.CheckpointKeepStrategy{
true: config.CheckpointRename,
false: config.CheckpointRemove,
"remove": config.CheckpointRemove,
"rename": config.CheckpointRename,
"origin": config.CheckpointOrigin,
}
var cp config.CheckpointKeepStrategy
for key, strategy := range tomlCases {
err := cp.UnmarshalTOML(key)
c.Assert(err, IsNil)
c.Assert(cp, Equals, strategy)
}

defaultCp := "enable = true\r\n"
cpCfg := &config.Checkpoint{}
_, err := toml.Decode(defaultCp, cpCfg)
c.Assert(err, IsNil)
c.Assert(cpCfg.KeepAfterSuccess, Equals, config.CheckpointRemove)

cpFmt := "keep-after-success = %v\r\n"
for key, strategy := range tomlCases {
cpValue := key
if strVal, ok := key.(string); ok {
cpValue = `"` + strVal + `"`
}
tomlStr := fmt.Sprintf(cpFmt, cpValue)
cpCfg := &config.Checkpoint{}
_, err := toml.Decode(tomlStr, cpCfg)
c.Assert(err, IsNil)
c.Assert(cpCfg.KeepAfterSuccess, Equals, strategy)
}

marshalTextCases := map[config.CheckpointKeepStrategy]string{
config.CheckpointRemove: "remove",
config.CheckpointRename: "rename",
config.CheckpointOrigin: "origin",
}
for strategy, value := range marshalTextCases {
res, err := strategy.MarshalText()
c.Assert(err, IsNil)
c.Assert(res, DeepEquals, []byte(value))
}
}
7 changes: 4 additions & 3 deletions br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,15 +1713,16 @@ func (rc *Controller) cleanCheckpoints(ctx context.Context) error {
}

logger := log.With(
zap.Bool("keepAfterSuccess", rc.cfg.Checkpoint.KeepAfterSuccess),
zap.Stringer("keepAfterSuccess", rc.cfg.Checkpoint.KeepAfterSuccess),
zap.Int64("taskID", rc.cfg.TaskID),
)

task := logger.Begin(zap.InfoLevel, "clean checkpoints")
var err error
if rc.cfg.Checkpoint.KeepAfterSuccess {
switch rc.cfg.Checkpoint.KeepAfterSuccess {
case config.CheckpointRename:
err = rc.checkpointsDB.MoveCheckpoints(ctx, rc.cfg.TaskID)
} else {
case config.CheckpointRemove:
err = rc.checkpointsDB.RemoveCheckpoint(ctx, "all")
}
task.End(zap.ErrorLevel, err)
Expand Down
2 changes: 1 addition & 1 deletion br/tests/lightning_checkpoint/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ table-concurrency = 1
enable = true
schema = "tidb_lightning_checkpoint_test_cppk"
driver = "mysql"
keep-after-success = true
keep-after-success = "origin"

[mydumper]
read-block-size = 1
2 changes: 1 addition & 1 deletion br/tests/lightning_checkpoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ for BACKEND in importer local; do
run_lightning -d "$DBPATH" --backend $BACKEND --enable-checkpoint=1
run_sql "$PARTIAL_IMPORT_QUERY"
check_contains "s: $(( (1000 * $CHUNK_COUNT + 1001) * $CHUNK_COUNT * $TABLE_COUNT ))"
run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cppk.1357924680.bak`.table_v7 WHERE status >= 200'
run_sql 'SELECT count(*) FROM `tidb_lightning_checkpoint_test_cppk`.table_v7 WHERE status >= 200'
check_contains "count(*): $TABLE_COUNT"

# Ensure there is no dangling open engines
Expand Down
9 changes: 6 additions & 3 deletions br/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,12 @@ driver = "file"
# For "mysql" driver, the DSN is a URL in the form "USER:PASS@tcp(HOST:PORT)/".
# If not specified, the TiDB server from the [tidb] section will be used to store the checkpoints.
#dsn = "/tmp/tidb_lightning_checkpoint.pb"
# Whether to keep the checkpoints after all data are imported. If false, the checkpoints will be deleted. The schema
# needs to be dropped manually, however.
#keep-after-success = false
# Whether to keep the checkpoints after all data are imported.
# valid options:
# - remove(default). the checkpoints will be deleted
# - rename. the checkpoints data will be kept, but will change the checkpoint data schema name with `schema.{taskID}.bak`
# - origin. keep the checkpoints data unchanged.
#keep-after-success = "remove"

[tikv-importer]
# Delivery backend, can be "importer", "local" or "tidb".
Expand Down

0 comments on commit 5f797c3

Please sign in to comment.