From 263f8525c351a15002e3ccaee6be897f822720f4 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 30 Dec 2021 18:37:50 +0800 Subject: [PATCH] compactor: fix duplicate entry in safemode (#3432) (#3434) (#4088) --- dm/syncer/compactor.go | 13 ++++ dm/syncer/compactor_test.go | 4 +- dm/syncer/dml.go | 50 +++++++------- dm/syncer/dml_test.go | 42 +++++++++--- dm/syncer/syncer_test.go | 10 +-- dm/tests/_utils/test_prepare | 21 ++++++ dm/tests/shardddl1/run.sh | 127 +++++++++++++++++++++++++++++++++-- 7 files changed, 221 insertions(+), 46 deletions(-) diff --git a/dm/syncer/compactor.go b/dm/syncer/compactor.go index e8b96dbd2a4..e49681aa4c7 100644 --- a/dm/syncer/compactor.go +++ b/dm/syncer/compactor.go @@ -14,6 +14,8 @@ package syncer import ( + "fmt" + "strconv" "time" "github.com/pingcap/failpoint" @@ -165,6 +167,15 @@ func (c *compactor) compactJob(j *job) { } key := j.dml.identifyKey() + + failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) { + value, err := strconv.Atoi(key) + upper := v.(int) + if err != nil || value > upper { + panic(fmt.Sprintf("downstream identifyKey check failed. key value %v should less than %v", value, upper)) + } + }) + prevPos, ok := tableKeyMap[key] // if no such key in the buffer, add it if !ok { @@ -184,6 +195,8 @@ func (c *compactor) compactJob(j *job) { j.dml.oldValues = nil j.dml.originOldValues = nil j.dml.op = insert + // DELETE + INSERT + UPDATE => INSERT with safemode + j.dml.safeMode = prevJob.dml.safeMode } else if prevJob.tp == update { // UPDATE + UPDATE => UPDATE j.dml.oldValues = prevJob.dml.oldValues diff --git a/dm/syncer/compactor_test.go b/dm/syncer/compactor_test.go index 19d374257a3..94a5b659a98 100644 --- a/dm/syncer/compactor_test.go +++ b/dm/syncer/compactor_test.go @@ -238,11 +238,13 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) { newDML(update, false, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi), newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), + newDML(update, false, targetTableID, sourceTable, []interface{}{2, 2, "b"}, []interface{}{2, 2, "c"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "c"}, ti.Columns, ti, tiIndex, downTi), }, output: []*DML{ newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti, tiIndex, downTi), - newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti, tiIndex, downTi), newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti, tiIndex, downTi), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "c"}, nil, []interface{}{2, 2, "c"}, ti.Columns, ti, tiIndex, downTi), }, }, } diff --git a/dm/syncer/dml.go b/dm/syncer/dml.go index d4a04245ed7..a9641a851f9 100644 --- a/dm/syncer/dml.go +++ b/dm/syncer/dml.go @@ -42,6 +42,7 @@ const ( updateDML = dmlOpType(update) deleteDML = dmlOpType(del) insertOnDuplicateDML dmlOpType = iota + 1 + replaceDML ) func (op dmlOpType) String() (str string) { @@ -54,6 +55,8 @@ func (op dmlOpType) String() (str string) { return "delete" case insertOnDuplicateDML: return "insert on duplicate update" + case replaceDML: + return "replace" } return } @@ -785,11 +788,15 @@ func (dml *DML) genDeleteSQL() ([]string, [][]interface{}) { } // genInsertSQL generates a `INSERT`. -// if in safemode, generates a `INSERT ON DUPLICATE UPDATE` statement. +// if in safemode, generates a `REPLACE` statement. func (dml *DML) genInsertSQL() ([]string, [][]interface{}) { var buf strings.Builder buf.Grow(1024) - buf.WriteString("INSERT INTO ") + if dml.safeMode { + buf.WriteString("REPLACE INTO ") + } else { + buf.WriteString("INSERT INTO ") + } buf.WriteString(dml.targetTableID) buf.WriteString(" (") for i, column := range dml.columns { @@ -810,16 +817,6 @@ func (dml *DML) genInsertSQL() ([]string, [][]interface{}) { buf.WriteString("?)") } } - if dml.safeMode { - buf.WriteString(" ON DUPLICATE KEY UPDATE ") - for i, column := range dml.columns { - col := dbutil.ColumnName(column.Name.O) - buf.WriteString(col + "=VALUES(" + col + ")") - if i != len(dml.columns)-1 { - buf.WriteByte(',') - } - } - } return []string{buf.String()}, [][]interface{}{dml.values} } @@ -837,16 +834,21 @@ func valuesHolder(n int) string { return builder.String() } -// genInsertOnDuplicateSQLMultipleRows generates a `INSERT` with multiple rows like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2)' +// genInsertSQLMultipleRows generates a `INSERT` with multiple rows like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2)' +// if replace, generates a `REPLACE' with multiple rows like 'REPLACE INTO tb(a,b) VALUES (1,1),(2,2)' // if onDuplicate, generates a `INSERT ON DUPLICATE KEY UPDATE` statement like 'INSERT INTO tb(a,b) VALUES (1,1),(2,2) ON DUPLICATE KEY UPDATE a=VALUES(a),b=VALUES(b)'. -func genInsertOnDuplicateSQLMultipleRows(onDuplicate bool, dmls []*DML) ([]string, [][]interface{}) { +func genInsertSQLMultipleRows(op dmlOpType, dmls []*DML) ([]string, [][]interface{}) { if len(dmls) == 0 { return nil, nil } var buf strings.Builder buf.Grow(1024) - buf.WriteString("INSERT INTO") + if op == replaceDML { + buf.WriteString("REPLACE INTO") + } else { + buf.WriteString("INSERT INTO") + } buf.WriteString(" " + dmls[0].targetTableID + " (") for i, column := range dmls[0].columns { buf.WriteString(dbutil.ColumnName(column.Name.O)) @@ -866,7 +868,7 @@ func genInsertOnDuplicateSQLMultipleRows(onDuplicate bool, dmls []*DML) ([]strin buf.WriteString(holder) } - if onDuplicate { + if op == insertOnDuplicateDML { buf.WriteString(" ON DUPLICATE KEY UPDATE ") for i, column := range dmls[0].columns { col := dbutil.ColumnName(column.Name.O) @@ -927,10 +929,8 @@ func genSQLMultipleRows(op dmlOpType, dmls []*DML) (queries []string, args [][]i log.L().Debug("generate DMLs with multiple rows", zap.Stringer("op", op), zap.Stringer("original op", dmls[0].op), zap.Int("rows", len(dmls))) } switch op { - case insertDML: - return genInsertOnDuplicateSQLMultipleRows(false, dmls) - case insertOnDuplicateDML: - return genInsertOnDuplicateSQLMultipleRows(true, dmls) + case insertDML, replaceDML, insertOnDuplicateDML: + return genInsertSQLMultipleRows(op, dmls) case deleteDML: return genDeleteSQLMultipleRows(dmls) } @@ -1052,17 +1052,19 @@ func genDMLsWithSameOp(dmls []*DML) ([]string, [][]interface{}) { // group dmls with same dmlOp for i, dml := range dmls { curOp := dmlOpType(dml.op) - // if update statement didn't update identify values, regard it as insert on duplicate. - // if insert with safemode, regard it as insert on duplicate. - if (curOp == updateDML && !dml.updateIdentify()) || (curOp == insertDML && dml.safeMode) { + if curOp == updateDML && !dml.updateIdentify() && !dml.safeMode { + // if update statement didn't update identify values and not in safemode, regard it as insert on duplicate. curOp = insertOnDuplicateDML + } else if curOp == insertDML && dml.safeMode { + // if insert with safemode, regard it as replace + curOp = replaceDML } if i == 0 { lastOp = curOp } - // now there are 4 situations: [insert, insert on duplicate(insert with safemode/update without identify keys), update(update identify keys), delete] + // now there are 5 situations: [insert, replace(insert with safemode), insert on duplicate(update without identify keys), update(update identify keys/update with safemode), delete] if lastOp != curOp { query, arg := genDMLsWithSameTable(lastOp, groupDMLs) queries = append(queries, query...) diff --git a/dm/syncer/dml_test.go b/dm/syncer/dml_test.go index bd4883bc634..b1a6910ff1d 100644 --- a/dm/syncer/dml_test.go +++ b/dm/syncer/dml_test.go @@ -317,7 +317,7 @@ func (s *testSyncerSuite) TestGenSQL(c *C) { }, { newDML(insert, true, "`targetSchema`.`targetTable`", &filter.Table{}, nil, []interface{}{1, 2, 3, "haha"}, nil, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil), - []string{"INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, + []string{"REPLACE INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"}, [][]interface{}{{1, 2, 3, "haha"}}, }, { @@ -332,7 +332,7 @@ func (s *testSyncerSuite) TestGenSQL(c *C) { }, { newDML(update, true, "`targetSchema`.`targetTable`", &filter.Table{}, []interface{}{1, 2, 3, "haha"}, []interface{}{4, 5, 6, "hihi"}, []interface{}{1, 2, 3, "haha"}, []interface{}{1, 2, 3, "haha"}, ti.Columns, ti, tiIndex, nil), - []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "INSERT INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)"}, + []string{"DELETE FROM `targetSchema`.`targetTable` WHERE `id` = ? LIMIT 1", "REPLACE INTO `targetSchema`.`targetTable` (`id`,`col1`,`col2`,`name`) VALUES (?,?,?,?)"}, [][]interface{}{{1}, {4, 5, 6, "hihi"}}, }, } @@ -438,7 +438,7 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) { newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti11.Columns, ti11, ti11Index, downTi11), newDML(insert, true, targetTableID1, sourceTable11, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti11.Columns, ti11, ti11Index, downTi11), newDML(insert, true, targetTableID1, sourceTable12, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti12.Columns, ti12, ti12Index, downTi12), - // update no index + // update no index but safemode newDML(update, true, targetTableID1, sourceTable11, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, []interface{}{1, 1, "a"}, []interface{}{1, 1, "aa"}, ti11.Columns, ti11, ti11Index, downTi11), newDML(update, true, targetTableID1, sourceTable11, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, []interface{}{2, 2, "b"}, []interface{}{2, 2, "bb"}, ti11.Columns, ti11, ti11Index, downTi11), newDML(update, true, targetTableID1, sourceTable12, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, []interface{}{3, 3, "c"}, []interface{}{3, 3, "cc"}, ti12.Columns, ti12, ti12Index, downTi12), @@ -486,17 +486,29 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) { expectQueries := []string{ // table1 - "INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?),(?,?,?),(?,?,?)", + "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", + "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", + "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", - "INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", - "INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", - "INSERT INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col1`=VALUES(`col1`),`name`=VALUES(`name`)", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", + "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", + "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", + "DELETE FROM `db1`.`tb1` WHERE `id` = ? LIMIT 1", + "REPLACE INTO `db1`.`tb1` (`id`,`col1`,`name`) VALUES (?,?,?)", "DELETE FROM `db1`.`tb1` WHERE (`id`) IN ((?),(?),(?))", // table2 - "INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)", + "REPLACE INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?)", "INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?)", "INSERT INTO `db2`.`tb2` (`id`,`col3`,`name`) VALUES (?,?,?)", "INSERT INTO `db2`.`tb2` (`id`,`col2`,`name`) VALUES (?,?,?),(?,?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`col2`=VALUES(`col2`),`name`=VALUES(`name`)", @@ -514,7 +526,19 @@ func (s *testSyncerSuite) TestGenDMLWithSameOp(c *C) { expectArgs := [][]interface{}{ // table1 - {1, 1, "a", 2, 2, "b", 3, 3, "c", 1, 1, "aa", 2, 2, "bb", 3, 3, "cc", 1, 4, "aa", 2, 5, "bb", 3, 6, "cc"}, + {1, 1, "a", 2, 2, "b", 3, 3, "c"}, + {1}, + {1, 1, "aa"}, + {2}, + {2, 2, "bb"}, + {3}, + {3, 3, "cc"}, + {1}, + {1, 4, "aa"}, + {2}, + {2, 5, "bb"}, + {3}, + {3, 6, "cc"}, {1}, {4, 4, "aa"}, {2}, diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index 3467cd1e8e9..b924a935546 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -867,7 +867,7 @@ func (s *testSyncerSuite) TestRun(c *C) { nil, }, { insert, - []string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + []string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"}, [][]interface{}{{int64(580981944116838401), "a"}}, }, { flush, @@ -879,7 +879,7 @@ func (s *testSyncerSuite) TestRun(c *C) { nil, }, { insert, - []string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + []string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"}, [][]interface{}{{int64(580981944116838402), "b"}}, }, { del, @@ -888,7 +888,7 @@ func (s *testSyncerSuite) TestRun(c *C) { }, { // safe mode is true, will split update to delete + replace update, - []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"}, [][]interface{}{{int64(580981944116838402)}, {int64(580981944116838401), "b"}}, }, { flush, @@ -1131,7 +1131,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { nil, }, { insert, - []string{"INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + []string{"REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"}, [][]interface{}{{int32(1), "a"}}, }, { del, @@ -1139,7 +1139,7 @@ func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { [][]interface{}{{int32(1)}}, }, { update, - []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "INSERT INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`),`name`=VALUES(`name`)"}, + []string{"DELETE FROM `test_1`.`t_1` WHERE `id` = ? LIMIT 1", "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)"}, [][]interface{}{{int32(2)}, {int32(1), "b"}}, }, { // start from this event, location passes safeModeExitLocation and safe mode should exit diff --git a/dm/tests/_utils/test_prepare b/dm/tests/_utils/test_prepare index 3e581f76d01..ff24c716c5c 100644 --- a/dm/tests/_utils/test_prepare +++ b/dm/tests/_utils/test_prepare @@ -241,6 +241,27 @@ function run_sql_tidb_with_retry() { fi } +# shortcut for run tidb sql and check result with retry +function run_sql_tidb_with_retry_times() { + rc=0 + for ((k=1; k<$3; k++)); do + run_sql_tidb "$1" + if grep -Fq "$2" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then + rc=1 + break + fi + echo "run tidb sql failed $k-th time, retry later" + sleep 2 + done + if [[ $rc = 0 ]]; then + echo "TEST FAILED: OUTPUT DOES NOT CONTAIN '$2'" + echo "____________________________________" + cat "$TEST_DIR/sql_res.$TEST_NAME.txt" + echo "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" + exit 1 + fi +} + # shortcut for check log contain with retry function check_log_contain_with_retry() { text=$1 diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index 1b16959c9c0..fa009a003b0 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -600,7 +600,7 @@ function DM_COMPACT() { ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true check_port_offline $WORKER1_PORT 20 check_port_offline $WORKER2_PORT 20 - export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1)' + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1);github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(5)' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT @@ -611,6 +611,54 @@ function DM_COMPACT() { "clean_table" "" } +function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { + END=10 + # As this kind of sql is no use, like "update tb1 set c=1 where a=100" which is behind of "insert into tb1(a,b,c) values(100,1,1)" + # We should avoid this kind of sql to make sure the count of dmls + for i in $(seq 0 $END); do + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)" + run_sql_source1 "update ${shardddl1}.${tb1} set c=20 where a=$((i + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$((i + 100))" + # Use downstream uk 'b' as key and this sql which modifiies 'b' will be splited to two job(delete+insert) + run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$((i + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$((i + 100))" + run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i + 200))" + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)" + done + run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb};" "count(1): 11" 30 + run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key auto_increment, b int unique not null, c int) auto_increment = 100; + insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb}; + drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 + compactCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "finish to compact" | wc -l) + # As compact is affected by "j.tp == flush", the check count of compact use "-le 50" + if [[ "$compactCnt" -le 50 ]]; then + echo "compact $compactCnt dmls which is less than 50" + exit 1 + fi +} + +function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() { + # downstream pk/uk/column is diffrent with upstream, compact use downstream schema. + ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20, if false, it will be panic. + # This goal is check whether it use downstream schema in compator. + # if use downstream schema, key will be 'b' with value less than 20. + # If use upstream schema, key will be 'a' with value greater than 100. + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/SkipFlushCompactor=return();github.com/pingcap/tiflow/dm/syncer/DownstreamIdentifyKeyCheckInCompact=return(20)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_case COMPACT_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique not null, c int);\"; + run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique not null, c int, d int primary key auto_increment) auto_increment = 100;\"" \ + "clean_table" "" +} + function DM_MULTIPLE_ROWS_CASE() { END=100 for i in $(seq 1 10 $END); do @@ -630,22 +678,56 @@ function DM_MULTIPLE_ROWS_CASE() { run_sql_source1 "delete from ${shardddl1}.${tb1} where a<=$((0 - i)) and a>$((-10 - i))" done + # wait safemode exit + check_log_contain_with_retry "disable safe-mode after task initialization finished" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log + + # insert again without safmode + for i in $(seq 1 10 $END); do + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b) values($i,$i),($((i + 1)),$((i + 1))),($((i + 2)),$((i + 2))),($((i + 3)),$((i + 3))),($((i + 4)),$((i + 4))),\ + ($((i + 5)),$((i + 5))),($((i + 6)),$((i + 6))),($((i + 7)),$((i + 7))),($((i + 8)),$((i + 8))),($((i + 9)),$((i + 9)))" + done + for i in $(seq 1 10 $END); do + run_sql_source1 "update ${shardddl1}.${tb1} set c=1 where a>=$i and a<$((i + 10))" + done + for i in $(seq 1 10 $END); do + run_sql_source1 "update ${shardddl1}.${tb1} set b = 0 - b where a>=$i and a<$((i + 10))" + done + for i in $(seq 1 10 $END); do + run_sql_source1 "update ${shardddl1}.${tb1} set a = 0 - a where a>=$i and a<$((i + 10))" + done + for i in $(seq 1 10 $END); do + run_sql_source1 "delete from ${shardddl1}.${tb1} where a<=$((0 - i)) and a>$((-10 - i))" + done + # insert new values, otherwise there may not be any data in downstream in middle stage and check_sync_diff return true immediately - for i in $(seq 100 110 $END); do + for i in $(seq 101 10 200); do run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b) values($i,$i),($((i + 1)),$((i + 1))),($((i + 2)),$((i + 2))),($((i + 3)),$((i + 3))),($((i + 4)),$((i + 4))),\ ($((i + 5)),$((i + 5))),($((i + 6)),$((i + 6))),($((i + 7)),$((i + 7))),($((i + 8)),$((i + 8))),($((i + 9)),$((i + 9)))" done + + run_sql_tidb_with_retry "select count(1) from ${shardddl}.${tb} where a>100 and a<=200;" "count(1): 100" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 - insertMergeCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep '"original op"=insert' | wc -l) - updateMergeCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep '"original op"=update' | wc -l) - deleteMergeCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep '"original op"=delete' | wc -l) - if [[ "$insertMergeCnt" -le 5 || "$updateMergeCnt" -le 5 || "$deleteMergeCnt" -le 5 ]]; then - echo "merge dmls less than 5, insertMergeCnt: $insertMergeCnt, updateMergeCnt: $updateMergeCnt, deleteMergeCnt: $deleteMergeCnt" + insertMergeCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep '\[op=insert\]' | wc -l) + replaceMergeCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep '\[op=replace\]' | wc -l) + updateMergeCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep '\[op="insert on duplicate update"\]' | wc -l) + deleteMergeCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep '\[op=delete\]' | wc -l) + echo $insertMergeCnt $replaceMergeCnt $updateMergeCnt $deleteMergeCnt + if [[ "$insertMergeCnt" -le 5 || "$updateMergeCnt" -le 5 || "$deleteMergeCnt" -le 5 || "$replaceMergeCnt" -le 5 ]]; then + echo "merge dmls less than 5, insertMergeCnt: $insertMergeCnt, replaceMergeCnt: $replaceMergeCnt, updateMergeCnt: $updateMergeCnt, deleteMergeCnt: $deleteMergeCnt" exit 1 fi } function DM_MULTIPLE_ROWS() { + ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/BlockExecuteSQLs=return(1);github.com/pingcap/tiflow/dm/syncer/SafeModeInitPhaseSeconds=return(5)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_case MULTIPLE_ROWS "single-source-no-sharding" \ "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique, c int);\"" \ "clean_table" "" @@ -677,13 +759,43 @@ function DM_CAUSALITY() { "clean_table" "" } +function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA_CASE() { + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,2)" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,3)" + run_sql_source1 "update ${shardddl1}.${tb1} set a=3, b=4 where b=3" + run_sql_source1 "delete from ${shardddl1}.${tb1} where a=1" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,3)" + + run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb} where a =1 and b=3;" "count(1): 1" 30 + run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key, b int unique); + insert into ${shardddl}.${tb}_temp (a, b) select a, b from ${shardddl}.${tb}; + drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + causalityCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "meet causality key, will generate a conflict job to flush all sqls" | wc -l) + if [[ "$causalityCnt" -ne 0 ]]; then + echo "causalityCnt is $causalityCnt, but it should be 0" + exit 1 + fi +} + +function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA() { + # downstream pk/uk/column is diffrent with upstream, causality use downstream schema. + run_case CAUSALITY_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique);\"; + run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique, c int primary key auto_increment) auto_increment = 100;\"" \ + "clean_table" "" +} + function run() { init_cluster init_database DM_COMPACT + DM_COMPACT_USE_DOWNSTREAM_SCHEMA DM_MULTIPLE_ROWS DM_CAUSALITY + DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA DM_UpdateBARule DM_RENAME_TABLE DM_RENAME_COLUMN_OPTIMISTIC @@ -697,6 +809,7 @@ function run() { DM_${i} sleep 1 done + } cleanup_data $shardddl