Skip to content

Commit

Permalink
sink(ticdc): only enable batch dml for more than one row (#10971) (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored May 23, 2024
1 parent b138d82 commit f8a1dbc
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/txn/mysql/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic
}
for i, column := range columnNames {
if i == len(columnNames)-1 {
builder.WriteString("`" + quotes.EscapeName(column) + "`=?")
builder.WriteString("`" + quotes.EscapeName(column) + "` = ?")
} else {
builder.WriteString("`" + quotes.EscapeName(column) + "`=?,")
builder.WriteString("`" + quotes.EscapeName(column) + "` = ?, ")
}
}

Expand All @@ -60,7 +60,7 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic
if wargs[i] == nil {
builder.WriteString(quotes.QuoteName(colNames[i]) + " IS NULL")
} else {
builder.WriteString(quotes.QuoteName(colNames[i]) + "=?")
builder.WriteString(quotes.QuoteName(colNames[i]) + " = ?")
args = append(args, wargs[i])
}
}
Expand Down
10 changes: 5 additions & 5 deletions cdc/sink/dmlsink/txn/mysql/dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestPrepareUpdate(t *testing.T) {
},
{Name: "b", Type: mysql.TypeVarchar, Flag: 0, Value: "test2"},
},
expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? LIMIT 1",
expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? LIMIT 1",
expectedArgs: []interface{}{1, "test2", 1},
},
{
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestPrepareUpdate(t *testing.T) {
Value: 100,
},
},
expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1",
expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1",
expectedArgs: []interface{}{2, "test2", 1, "test"},
},
{
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestPrepareUpdate(t *testing.T) {
Value: 100,
},
},
expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1",
expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1",
expectedArgs: []interface{}{2, []byte("世界"), 1, []byte("你好")},
},
{
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestPrepareUpdate(t *testing.T) {
Value: 100,
},
},
expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1",
expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1",
expectedArgs: []interface{}{2, []byte("世界"), 1, []byte("你好")},
},
{
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestPrepareUpdate(t *testing.T) {
Value: 100,
},
},
expectedSQL: "UPDATE `test`.`t1` SET `a`=?,`b`=? WHERE `a`=? AND `b`=? LIMIT 1",
expectedSQL: "UPDATE `test`.`t1` SET `a` = ?, `b` = ? WHERE `a` = ? AND `b` = ? LIMIT 1",
expectedArgs: []interface{}{2, "世界", 1, "你好"},
},
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,10 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
callbacks = append(callbacks, event.Callback)
}

// TODO: find a better threshold
enableBatchModeThreshold := 1
// Determine whether to use batch dml feature here.
if s.cfg.BatchDMLEnable {
if s.cfg.BatchDMLEnable && len(event.Event.Rows) > enableBatchModeThreshold {
tableColumns := firstRow.Columns
if firstRow.IsDelete() {
tableColumns = firstRow.PreColumns
Expand Down
12 changes: 6 additions & 6 deletions cdc/sink/dmlsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,12 +976,12 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{
"UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1",
"UPDATE `common_1`.`uk_without_pk` SET `a1` = ?, `a3` = ? " +
"WHERE `a1` = ? AND `a3` = ? LIMIT 1",
},
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
approximateSize: 83,
approximateSize: 92,
},
}, {
name: "update with PK",
Expand Down Expand Up @@ -1017,11 +1017,11 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1"},
sqls: []string{"UPDATE `common_1`.`pk` SET `a1` = ?, `a3` = ? " +
"WHERE `a1` = ? AND `a3` = ? LIMIT 1"},
values: [][]interface{}{{3, 3, 2, 2}},
rowCount: 1,
approximateSize: 72,
approximateSize: 81,
},
}, {
name: "batch insert with PK",
Expand Down
4 changes: 2 additions & 2 deletions pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func getMockDB(t *testing.T) *sql.DB {

// First, apply row which commitTs equal to resolvedTs
mock.ExpectBegin()
mock.ExpectExec("DELETE FROM `test`.`t1` WHERE (`a`,`b`) IN ((?,?))").
WithArgs(1, "2").
mock.ExpectExec("DELETE FROM `test`.`t1` WHERE `a` = ? LIMIT 1").
WithArgs(1).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("REPLACE INTO `test`.`t1` (`a`,`b`) VALUES (?,?)").
WithArgs(2, "3").
Expand Down

0 comments on commit f8a1dbc

Please sign in to comment.