Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: enable setting conflict max-error #40874

Merged
merged 2 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,32 +343,64 @@ type MaxError struct {
// In TiDB backend, this also includes all possible SQL errors raised from INSERT,
// such as unique key conflict when `on-duplicate` is set to `error`.
// When tolerated, the row causing the error will be skipped, and adds 1 to the counter.
// The default value is zero, which means that such errors are not tolerated.
Type atomic.Int64 `toml:"type" json:"type"`

// Conflict is the maximum number of unique key conflicts in local backend accepted.
// When tolerated, every pair of conflict adds 1 to the counter.
// Those pairs will NOT be deleted from the target. Conflict resolution is performed separately.
// TODO Currently this is hard-coded to infinity.
Conflict atomic.Int64 `toml:"conflict" json:"-"`
// The default value is max int64, which means conflict errors will be recorded as much as possible.
// Sometime the actual number of conflict record logged will be greater than the value configured here,
// because conflict error data are recorded batch by batch.
// If the limit is reached in a single batch, the entire batch of records will be persisted before an error is reported.
Conflict atomic.Int64 `toml:"conflict" json:"conflict"`
}

func (cfg *MaxError) UnmarshalTOML(v interface{}) error {
defaultValMap := map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": math.MaxInt64,
}
// set default value first
cfg.Syntax.Store(defaultValMap["syntax"])
cfg.Charset.Store(defaultValMap["charset"])
cfg.Type.Store(defaultValMap["type"])
cfg.Conflict.Store(defaultValMap["conflict"])
switch val := v.(type) {
case int64:
// ignore val that is smaller than 0
if val < 0 {
val = 0
if val >= 0 {
// only set type error
cfg.Type.Store(val)
}
cfg.Syntax.Store(0)
cfg.Charset.Store(math.MaxInt64)
cfg.Type.Store(val)
cfg.Conflict.Store(math.MaxInt64)
return nil
case map[string]interface{}:
// TODO support stuff like `max-error = { charset = 1000, type = 1000 }` if proved useful.
// support stuff like `max-error = { charset = 1000, type = 1000 }`.
getVal := func(k string, v interface{}) int64 {
defaultVal, ok := defaultValMap[k]
if !ok {
return 0
}
iVal, ok := v.(int64)
if !ok || iVal < 0 {
return defaultVal
}
return iVal
}
for k, v := range val {
switch k {
case "type":
cfg.Type.Store(getVal(k, v))
case "conflict":
cfg.Conflict.Store(getVal(k, v))
}
}
return nil
default:
return errors.Errorf("invalid max-error '%v', should be an integer or a map of string:int64", v)
}
return errors.Errorf("invalid max-error '%v', should be an integer", v)
}

// DuplicateResolutionAlgorithm is the config type of how to resolve duplicates.
Expand Down Expand Up @@ -805,8 +837,16 @@ func (cfg *Config) LoadFromTOML(data []byte) error {
unusedGlobalKeyStrs[key.String()] = struct{}{}
}

iterateUnusedKeys:
for _, key := range unusedConfigKeys {
keyStr := key.String()
switch keyStr {
// these keys are not counted as decoded by toml decoder, but actually they are decoded,
// because the corresponding unmarshal logic handles these key's decoding in a custom way
case "lightning.max-error.type",
"lightning.max-error.conflict":
continue iterateUnusedKeys
}
if _, found := unusedGlobalKeyStrs[keyStr]; found {
bothUnused = append(bothUnused, keyStr)
} else {
Expand Down
121 changes: 121 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"flag"
"fmt"
"math"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -561,6 +562,126 @@ func TestDurationUnmarshal(t *testing.T) {
require.Regexp(t, "time: unknown unit .?x.? in duration .?13x20s.?", err.Error())
}

func TestMaxErrorUnmarshal(t *testing.T) {
type testCase struct {
TOMLStr string
ExpectedValues map[string]int64
ExpectErrStr string
CaseName string
}
for _, tc := range []*testCase{
{
TOMLStr: `max-error = 123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": math.MaxInt64,
},
CaseName: "Normal_Int",
},
{
TOMLStr: `max-error = -123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": math.MaxInt64,
},
CaseName: "Abnormal_Negative_Int",
},
{
TOMLStr: `max-error = "abcde"`,
ExpectErrStr: "invalid max-error 'abcde', should be an integer or a map of string:int64",
CaseName: "Abnormal_String",
},
{
TOMLStr: `[max-error]
syntax = 1
charset = 2
type = 3
conflict = 4
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 3,
"conflict": 4,
},
CaseName: "Normal_Map_All_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set",
},
{
TOMLStr: `max-error = { conflict = 1000, type = 123 }`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": 1000,
},
CaseName: "Normal_OneLineMap_Partial_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
not_exist = 123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Key",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = -123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Value",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = abc
`,
ExpectErrStr: `toml: line 3 (last key "max-error.type"): expected value but found "abc" instead`,
CaseName: "Normal_Map_Partial_Set_Invalid_ValueType",
},
} {
targetLightningCfg := new(config.Lightning)
err := toml.Unmarshal([]byte(tc.TOMLStr), targetLightningCfg)
if len(tc.ExpectErrStr) > 0 {
require.Errorf(t, err, "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectErrStr, err.Error(), "test case: %s", tc.CaseName)
} else {
require.NoErrorf(t, err, "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["syntax"], targetLightningCfg.MaxError.Syntax.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["charset"], targetLightningCfg.MaxError.Charset.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["type"], targetLightningCfg.MaxError.Type.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["conflict"], targetLightningCfg.MaxError.Conflict.Load(), "test case: %s", tc.CaseName)
}
}
}

func TestDurationMarshalJSON(t *testing.T) {
duration := config.Duration{}
err := duration.UnmarshalText([]byte("13m20s"))
Expand Down
31 changes: 22 additions & 9 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ func (em *ErrorManager) RecordTypeError(
if em.remainingError.Type.Dec() < 0 {
threshold := em.configError.Type.Load()
if threshold > 0 {
encodeErr = errors.Annotatef(encodeErr, "meet errors exceed the max-error.type threshold '%d'",
encodeErr = errors.Annotatef(encodeErr,
"The number of type errors exceeds the threshold configured by `max-error.type`: '%d'",
em.configError.Type.Load())
}
return encodeErr
Expand Down Expand Up @@ -241,25 +242,28 @@ func (em *ErrorManager) RecordDataConflictError(
tableName string,
conflictInfos []DataConflictInfo,
) error {
var gerr error
if len(conflictInfos) == 0 {
return nil
}

if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 {
threshold := em.configError.Conflict.Load()
return errors.Errorf(" meet errors exceed the max-error.conflict threshold '%d'", threshold)
// Still need to record this batch of conflict records, and then return this error at last.
// Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded
gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold)
}

if em.db == nil {
return nil
return gerr
}

exec := common.SQLWithRetry{
DB: em.db,
Logger: logger,
HideQueryLog: redact.NeedRedact(),
}
return exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error {
if err := exec.Transact(ctx, "insert data conflict error record", func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorData, em.schemaEscaped)
var sqlArgs []interface{}
Expand All @@ -279,7 +283,10 @@ func (em *ErrorManager) RecordDataConflictError(
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
})
}); err != nil {
gerr = err
}
return gerr
}

func (em *ErrorManager) RecordIndexConflictError(
Expand All @@ -290,25 +297,28 @@ func (em *ErrorManager) RecordIndexConflictError(
conflictInfos []DataConflictInfo,
rawHandles, rawRows [][]byte,
) error {
var gerr error
if len(conflictInfos) == 0 {
return nil
}

if em.remainingError.Conflict.Sub(int64(len(conflictInfos))) < 0 {
threshold := em.configError.Conflict.Load()
return errors.Errorf(" meet errors exceed the max-error.conflict threshold %d", threshold)
// Still need to record this batch of conflict records, and then return this error at last.
// Otherwise, if the max-error.conflict is set a very small value, non of the conflict errors will be recorded
gerr = errors.Errorf("The number of conflict errors exceeds the threshold configured by `max-error.conflict`: '%d'", threshold)
}

if em.db == nil {
return nil
return gerr
}

exec := common.SQLWithRetry{
DB: em.db,
Logger: logger,
HideQueryLog: redact.NeedRedact(),
}
return exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error {
if err := exec.Transact(ctx, "insert index conflict error record", func(c context.Context, txn *sql.Tx) error {
sb := &strings.Builder{}
fmt.Fprintf(sb, insertIntoConflictErrorIndex, em.schemaEscaped)
var sqlArgs []interface{}
Expand All @@ -331,7 +341,10 @@ func (em *ErrorManager) RecordIndexConflictError(
}
_, err := txn.ExecContext(c, sb.String(), sqlArgs...)
return err
})
}); err != nil {
gerr = err
}
return gerr
}

// ResolveAllConflictKeys query all conflicting rows (handle and their
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE testtbl (
id INTEGER PRIMARY KEY,
val1 VARCHAR(40) NOT NULL,
INDEX `idx_val1` (`val1`)
);
16 changes: 16 additions & 0 deletions br/tests/lightning_config_max_error/data/mytest.testtbl.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
id,val1
1,"aaa01"
2,"aaa01"
3,"aaa02"
4,"aaa02"
5,"aaa05"
6,"aaa06"
7,"aaa07"
8,"aaa08"
9,"aaa09"
10,"aaa10"
1,"bbb01"
2,"bbb02"
3,"bbb03"
4,"bbb04"
5,"bbb05"
8 changes: 8 additions & 0 deletions br/tests/lightning_config_max_error/err_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning.max-error]
conflict = 4

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
8 changes: 8 additions & 0 deletions br/tests/lightning_config_max_error/normal_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning.max-error]
conflict = 20

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[lightning]
max-error = 0 # this actually sets the type error

[mydumper.csv]
header = true

[tikv-importer]
duplicate-resolution = 'remove'
Loading