Skip to content

Commit

Permalink
compactor: fix duplicate entry in safemode (pingcap#3432) (pingcap#3434
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and 3AceShowHand committed Jan 13, 2022
1 parent 76de75d commit 263f852
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 46 deletions.
13 changes: 13 additions & 0 deletions dm/syncer/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package syncer

import (
"fmt"
"strconv"
"time"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -165,6 +167,15 @@ func (c *compactor) compactJob(j *job) {
}

key := j.dml.identifyKey()

failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) {
value, err := strconv.Atoi(key)
upper := v.(int)
if err != nil || value > upper {
panic(fmt.Sprintf("downstream identifyKey check failed. key value %v should less than %v", value, upper))
}
})

prevPos, ok := tableKeyMap[key]
// if no such key in the buffer, add it
if !ok {
Expand All @@ -184,6 +195,8 @@ func (c *compactor) compactJob(j *job) {
j.dml.oldValues = nil
j.dml.originOldValues = nil
j.dml.op = insert
// DELETE + INSERT + UPDATE => INSERT with safemode
j.dml.safeMode = prevJob.dml.safeMode
} else if prevJob.tp == update {
// UPDATE + UPDATE => UPDATE
j.dml.oldValues = prevJob.dml.oldValues
Expand Down
4 changes: 3 additions & 1 deletion dm/syncer/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,13 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) {
newDML(update, false, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi),
newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi),
newDML(update, false, targetTableID, sourceTable, []interface{}{2, 2, "b"}, []interface{}{2, 2, "c"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "c"}, ti.Columns, ti, tiIndex, downTi),
},
output: []*DML{
newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi),
newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi),
newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "c"}, nil, []interface{}{2, 2, "c"}, ti.Columns, ti, tiIndex, downTi),
},
},
}
Expand Down
50 changes: 26 additions & 24 deletions dm/syncer/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
updateDML = dmlOpType(update)
deleteDML = dmlOpType(del)
insertOnDuplicateDML dmlOpType = iota + 1
replaceDML
)

func (op dmlOpType) String() (str string) {
Expand All @@ -54,6 +55,8 @@ func (op dmlOpType) String() (str string) {
return "delete"
case insertOnDuplicateDML:
return "insert on duplicate update"
case replaceDML:
return "replace"
}
return
}
Expand Down Expand Up @@ -785,11 +788,15 @@ func (dml *DML) genDeleteSQL() ([]string, [][]interface{}) {
}

// genInsertSQL generates a `INSERT`.
// if in safemode, generates a `INSERT ON DUPLICATE UPDATE` statement.
// if in safemode, generates a `REPLACE` statement.
func (dml *DML) genInsertSQL() ([]string, [][]interface{}) {
var buf strings.Builder
buf.Grow(1024)
buf.WriteString("INSERT INTO ")
if dml.safeMode {
buf.WriteString("REPLACE INTO ")
} else {
buf.WriteString("INSERT INTO ")
}
buf.WriteString(dml.targetTableID)
buf.WriteString(" (")
for i, column := range dml.columns {
Expand All @@ -810,16 +817,6 @@ func (dml *DML) genInsertSQL() ([]string, [][]interface{}) {
buf.WriteString("?)")
}
}
if dml.safeMode {
buf.WriteString(" ON DUPLICATE KEY UPDATE ")
for i, column := range dml.columns {
col := dbutil.ColumnName(column.Name.O)
buf.WriteString(col + "=VALUES(" + col + ")")
if i != len(dml.columns)-1 {
buf.WriteByte(',')
}
}
}
return []string{buf.String()}, [][]interface{}{dml.values}
}

Expand All @@ -837,16 +834,21 @@ func valuesHolder(n int) string {
return builder.String()
}

// genInsertOnDuplicateSQLMultipleRows generates a `INSERT` with multiple rows like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2)'
// genInsertSQLMultipleRows generates a `INSERT` with multiple rows like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2)'
// if replace, generates a `REPLACE' with multiple rows like 'REPLACE INTO tb(a,b) VALUES (1,1),(2,2)'
// if onDuplicate, generates a `INSERT ON DUPLICATE KEY UPDATE` statement like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2) ON DUPLICATE KEY UPDATE a=VALUES(a),b=VALUES(b)'.
func genInsertOnDuplicateSQLMultipleRows(onDuplicate bool, dmls []*DML) ([]string, [][]interface{}) {
func genInsertSQLMultipleRows(op dmlOpType, dmls []*DML) ([]string, [][]interface{}) {
if len(dmls) == 0 {
return nil, nil
}

var buf strings.Builder
buf.Grow(1024)
buf.WriteString("INSERT INTO")
if op == replaceDML {
buf.WriteString("REPLACE INTO")
} else {
buf.WriteString("INSERT INTO")
}
buf.WriteString(" " + dmls[0].targetTableID + " (")
for i, column := range dmls[0].columns {
buf.WriteString(dbutil.ColumnName(column.Name.O))
Expand All @@ -866,7 +868,7 @@ func genInsertOnDuplicateSQLMultipleRows(onDuplicate bool, dmls []*DML) ([]strin
buf.WriteString(holder)
}

if onDuplicate {
if op == insertOnDuplicateDML {
buf.WriteString(" ON DUPLICATE KEY UPDATE ")
for i, column := range dmls[0].columns {
col := dbutil.ColumnName(column.Name.O)
Expand Down Expand Up @@ -927,10 +929,8 @@ func genSQLMultipleRows(op dmlOpType, dmls []*DML) (queries []string, args [][]i
log.L().Debug("generate DMLs with multiple rows", zap.Stringer("op", op), zap.Stringer("original op", dmls[0].op), zap.Int("rows", len(dmls)))
}
switch op {
case insertDML:
return genInsertOnDuplicateSQLMultipleRows(false, dmls)
case insertOnDuplicateDML:
return genInsertOnDuplicateSQLMultipleRows(true, dmls)
case insertDML, replaceDML, insertOnDuplicateDML:
return genInsertSQLMultipleRows(op, dmls)
case deleteDML:
return genDeleteSQLMultipleRows(dmls)
}
Expand Down Expand Up @@ -1052,17 +1052,19 @@ func genDMLsWithSameOp(dmls []*DML) ([]string, [][]interface{}) {
// group dmls with same dmlOp
for i, dml := range dmls {
curOp := dmlOpType(dml.op)
// if update statement didn't update identify values, regard it as insert on duplicate.
// if insert with safemode, regard it as insert on duplicate.
if (curOp == updateDML && !dml.updateIdentify()) || (curOp == insertDML && dml.safeMode) {
if curOp == updateDML && !dml.updateIdentify() && !dml.safeMode {
// if update statement didn't update identify values and not in safemode, regard it as insert on duplicate.
curOp = insertOnDuplicateDML
} else if curOp == insertDML && dml.safeMode {
// if insert with safemode, regard it as replace
curOp = replaceDML
}

if i == 0 {
lastOp = curOp
}

// now there are 4 situations: [insert, insert on duplicate(insert with safemode/update without identify keys), update(update identify keys), delete]
// now there are 5 situations: [insert, replace(insert with safemode), insert on duplicate(update without identify keys), update(update identify keys/update with safemode), delete]
if lastOp != curOp {
query, arg := genDMLsWithSameTable(lastOp, groupDMLs)
queries = append(queries, query...)
Expand Down
42 changes: 33 additions & 9 deletions dm/syncer/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (s *testSyncerSuite) TestGenSQL(c *C) {
},
{
newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil),
[]string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"},
[][]interface{}{{1, 2, 3, "haha"}},
},
{
Expand All @@ -332,7 +332,7 @@ func (s *testSyncerSuite) TestGenSQL(c *C) {
},
{
newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil),
[]string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"},
[]string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "REPLACE INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"},
[][]interface{}{{1}, {4, 5, 6, "hihi"}},
},
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) {
newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(insert, true, targetTableID1, sourceTable12, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti12.Columns, ti12, ti12Index, downTi12),
// update no index
// update no index but safemode
newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, ti11.Columns, ti11, ti11Index, downTi11),
newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, ti12.Columns, ti12, ti12Index, downTi12),
Expand Down Expand Up @@ -486,17 +486,29 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) {

expectQueries := []string{
// table1
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?),(?,?,?),(?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1",
"REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)",
"DELETE FROM `db1`.`tb1` WHERE (`id`) IN ((?),(?),(?))",

// table2
"INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)",
"REPLACE INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?)",
"INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?)",
"INSERT INTO `db2`.`tb2` (`id`,`col3`,`name`) VALUES (?,?,?)",
"INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)",
Expand All @@ -514,7 +526,19 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) {

expectArgs := [][]interface{}{
// table1
{1, 1, "a", 2, 2, "b", 3, 3, "c", 1, 1, "aa", 2, 2, "bb", 3, 3, "cc", 1, 4, "aa", 2, 5, "bb", 3, 6, "cc"},
{1, 1, "a", 2, 2, "b", 3, 3, "c"},
{1},
{1, 1, "aa"},
{2},
{2, 2, "bb"},
{3},
{3, 3, "cc"},
{1},
{1, 4, "aa"},
{2},
{2, 5, "bb"},
{3},
{3, 6, "cc"},
{1},
{4, 4, "aa"},
{2},
Expand Down
10 changes: 5 additions & 5 deletions dm/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
nil,
}, {
insert,
[]string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int64(580981944116838401), "a"}},
}, {
flush,
Expand All @@ -879,7 +879,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
nil,
}, {
insert,
[]string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int64(580981944116838402), "b"}},
}, {
del,
Expand All @@ -888,7 +888,7 @@ func (s *testSyncerSuite) TestRun(c *C) {
}, {
// safe mode is true, will split update to delete + replace
update,
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int64(580981944116838402)}, {int64(580981944116838401), "b"}},
}, {
flush,
Expand Down Expand Up @@ -1131,15 +1131,15 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) {
nil,
}, {
insert,
[]string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int32(1), "a"}},
}, {
del,
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1"},
[][]interface{}{{int32(1)}},
}, {
update,
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"},
[]string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"},
[][]interface{}{{int32(2)}, {int32(1), "b"}},
}, {
// start from this event, location passes safeModeExitLocation and safe mode should exit
Expand Down
21 changes: 21 additions & 0 deletions dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,27 @@ function run_sql_tidb_with_retry() {
fi
}

# shortcut for run tidb sql and check result with retry
function run_sql_tidb_with_retry_times() {
rc=0
for ((k=1; k<$3; k++)); do
run_sql_tidb "$1"
if grep -Fq "$2" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then
rc=1
break
fi
echo "run tidb sql failed $k-th time, retry later"
sleep 2
done
if [[ $rc = 0 ]]; then
echo "TEST FAILED: OUTPUT DOES NOT CONTAIN '$2'"
echo "____________________________________"
cat "$TEST_DIR/sql_res.$TEST_NAME.txt"
echo "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
exit 1
fi
}

# shortcut for check log contain with retry
function check_log_contain_with_retry() {
text=$1
Expand Down
Loading

0 comments on commit 263f852

Please sign in to comment.