diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index 38dc541fca4..a7aa6b9cea4 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -42,9 +42,9 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic } for i, column := range columnNames { if i == len(columnNames)-1 { - builder.WriteString("`" + quotes.EscapeName(column) + "`=?") + builder.WriteString("`" + quotes.EscapeName(column) + "` = ?") } else { - builder.WriteString("`" + quotes.EscapeName(column) + "`=?,") + builder.WriteString("`" + quotes.EscapeName(column) + "` = ?, ") } } @@ -60,7 +60,7 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic if wargs[i] == nil { builder.WriteString(quotes.QuoteName(colNames[i]) + " IS NULL") } else { - builder.WriteString(quotes.QuoteName(colNames[i]) + "=?") + builder.WriteString(quotes.QuoteName(colNames[i]) + " = ?") args = append(args, wargs[i]) } } diff --git a/cdc/sink/dmlsink/txn/mysql/dml_test.go b/cdc/sink/dmlsink/txn/mysql/dml_test.go index 937fe3bb92a..0bfdd1fbb77 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml_test.go +++ b/cdc/sink/dmlsink/txn/mysql/dml_test.go @@ -63,7 +63,7 @@ func TestPrepareUpdate(t *testing.T) { }, {Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test2"}, }, - expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? LIMIT 1", + expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1", expectedArgs: []interface{}{1, "test2", 1}, }, { @@ -107,7 +107,7 @@ func TestPrepareUpdate(t *testing.T) { Value: 100, }, }, - expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1", + expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1", expectedArgs: []interface{}{2, "test2", 1, "test"}, }, { @@ -151,7 +151,7 @@ func TestPrepareUpdate(t *testing.T) { Value: 100, }, }, - expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1", + expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1", expectedArgs: []interface{}{2, []byte("世界"), 1, []byte("你好")}, }, { @@ -198,7 +198,7 @@ func TestPrepareUpdate(t *testing.T) { Value: 100, }, }, - expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1", + expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1", expectedArgs: []interface{}{2, []byte("世界"), 1, []byte("你好")}, }, { @@ -245,7 +245,7 @@ func TestPrepareUpdate(t *testing.T) { Value: 100, }, }, - expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1", + expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1", expectedArgs: []interface{}{2, "世界", 1, "你好"}, }, } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 97d9bd07171..61a8178a050 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -567,8 +567,10 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { callbacks = append(callbacks, event.Callback) } + // TODO: find a better threshold + enableBatchModeThreshold := 1 // Determine whether to use batch dml feature here. - if s.cfg.BatchDMLEnable { + if s.cfg.BatchDMLEnable && len(event.Event.Rows) > enableBatchModeThreshold { tableColumns := firstRow.Columns if firstRow.IsDelete() { tableColumns = firstRow.PreColumns diff --git a/cdc/sink/dmlsink/txn/mysql/mysql_test.go b/cdc/sink/dmlsink/txn/mysql/mysql_test.go index 90d19c5f246..78dc4b58770 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql_test.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql_test.go @@ -976,12 +976,12 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { expected: &preparedDMLs{ startTs: []model.Ts{418658114257813516}, sqls: []string{ - "UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " + - "WHERE `a1`=? AND `a3`=? LIMIT 1", + "UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? " + + "WHERE `a1` = ? AND `a3` = ? LIMIT 1", }, values: [][]interface{}{{3, 3, 2, 2}}, rowCount: 1, - approximateSize: 83, + approximateSize: 92, }, }, { name: "update with PK", @@ -1017,11 +1017,11 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, expected: &preparedDMLs{ startTs: []model.Ts{418658114257813516}, - sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " + - "WHERE `a1`=? AND `a3`=? LIMIT 1"}, + sqls: []string{"UPDATE `common_1`.`pk` SET `a1` = ?, `a3` = ? " + + "WHERE `a1` = ? AND `a3` = ? LIMIT 1"}, values: [][]interface{}{{3, 3, 2, 2}}, rowCount: 1, - approximateSize: 72, + approximateSize: 81, }, }, { name: "batch insert with PK", diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index 3c37bb4bb63..7262c967258 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -259,8 +259,8 @@ func getMockDB(t *testing.T) *sql.DB { // First, apply row which commitTs equal to resolvedTs mock.ExpectBegin() - mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))"). - WithArgs(1, "2"). + mock.ExpectExec("DELETE FROM `test`.`t1` WHERE `a` = ? LIMIT 1"). + WithArgs(1). WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)"). WithArgs(2, "3").