diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index e690a7fe96b..d720bba95fc 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -33,6 +33,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/pipeline" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" + "github.com/pingcap/tiflow/pkg/retry" "github.com/tikv/client-go/v2/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" @@ -191,15 +192,27 @@ func (n *sorterNode) start( case <-stdCtx.Done(): return nil case startTs = <-n.startTsCh: - phy, logic, err := n.pdClient.GetTS(ctx) + backoffBaseDelayInMs := int64(100) + totalRetryDuration := 10 * time.Second + start := time.Now() + err := retry.Do(stdCtx, func() error { + phy, logic, err := n.pdClient.GetTS(ctx) + if err != nil { + return errors.Trace(err) + } + replicateTs = oracle.ComposeTS(phy, logic) + return nil + }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), + retry.WithTotalRetryDuratoin(totalRetryDuration), + retry.WithIsRetryableErr(cerror.IsRetryableError)) if err != nil { return errors.Trace(err) } - replicateTs = oracle.ComposeTS(phy, logic) log.Info("table is replicating", zap.Int64("tableID", n.tableID), zap.String("tableName", n.tableName), zap.Uint64("replicateTs", replicateTs), + zap.Duration("duration", time.Since(start)), zap.String("namespace", n.changefeed.Namespace), zap.String("changefeed", n.changefeed.ID)) } diff --git a/cdc/sink/mysql/mysql.go b/cdc/sink/mysql/mysql.go index fae95639b0b..e12b0a1e3da 100644 --- a/cdc/sink/mysql/mysql.go +++ b/cdc/sink/mysql/mysql.go @@ -300,11 +300,15 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve return retry.Do(ctx, func() error { err := s.execDDL(ctx, ddl) if errorutil.IsIgnorableMySQLDDLError(err) { - log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err)) + log.Info("execute DDL failed, but error can be ignored", + zap.Uint64("startTs", ddl.StartTs), zap.String("ddl", ddl.Query), + zap.Error(err)) return nil } if err != nil { - log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err)) + log.Warn("execute DDL with error, retry later", + zap.Uint64("startTs", ddl.StartTs), zap.String("ddl", ddl.Query), + zap.Error(err)) } return err }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), @@ -599,12 +603,14 @@ func (s *mysqlSink) getTableResolvedTs(tableID model.TableID) (model.ResolvedTs, } func logDMLTxnErr( - err error, start time.Time, changefeed model.ChangeFeedID, query string, count int, + err error, start time.Time, changefeed model.ChangeFeedID, + query string, count int, startTs []model.Ts, ) error { if isRetryableDMLError(err) { log.Warn("execute DMLs with error, retry later", zap.Error(err), zap.Duration("duration", time.Since(start)), zap.String("query", query), zap.Int("count", count), + zap.Uint64s("startTs", startTs), zap.String("namespace", changefeed.Namespace), zap.String("changefeed", changefeed.ID)) } else { @@ -642,7 +648,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML failpoint.Return( logDMLTxnErr( errors.Trace(driver.ErrBadConn), - start, s.params.changefeedID, "failpoint", 0)) + start, s.params.changefeedID, "failpoint", 0, nil)) }) failpoint.Inject("MySQLSinkHangLongTime", func() { time.Sleep(time.Hour) @@ -652,7 +658,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML if err != nil { return 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), - start, s.params.changefeedID, "BEGIN", dmls.rowCount) + start, s.params.changefeedID, "BEGIN", dmls.rowCount, dmls.startTs) } for i, query := range dmls.sqls { @@ -663,30 +669,18 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML log.Warn("failed to rollback txn", zap.Error(err)) _ = logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), - start, s.params.changefeedID, query, dmls.rowCount) + start, s.params.changefeedID, query, dmls.rowCount, dmls.startTs) } return 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), - start, s.params.changefeedID, query, dmls.rowCount) - } - } - - if len(dmls.markSQL) != 0 { - log.Debug("exec row", zap.String("sql", dmls.markSQL)) - if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - log.Warn("failed to rollback txn", zap.Error(err)) - } - return 0, logDMLTxnErr( - cerror.WrapError(cerror.ErrMySQLTxnError, err), - start, s.params.changefeedID, dmls.markSQL, dmls.rowCount) + start, s.params.changefeedID, query, dmls.rowCount, dmls.startTs) } } if err = tx.Commit(); err != nil { return 0, logDMLTxnErr( cerror.WrapError(cerror.ErrMySQLTxnError, err), - start, s.params.changefeedID, "COMMIT", dmls.rowCount) + start, s.params.changefeedID, "COMMIT", dmls.rowCount, dmls.startTs) } return dmls.rowCount, nil }) @@ -706,14 +700,15 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML } type preparedDMLs struct { + startTs []model.Ts sqls []string values [][]interface{} - markSQL string rowCount int } // prepareDMLs converts model.RowChangedEvent list to query string list and args list func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, bucket int) *preparedDMLs { + startTs := make([]model.Ts, 0, 1) sqls := make([]string, 0, len(rows)) values := make([][]interface{}, 0, len(rows)) replaces := make(map[string][][]interface{}) @@ -744,6 +739,10 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, bucket int) *prep var query string var args []interface{} quoteTable := quotes.QuoteSchema(row.Table.Schema, row.Table.Table) + if len(startTs) == 0 || // Always add the first row's start ts. + startTs[len(startTs)-1] != row.StartTs { // Try to deduplicate starts ts. + startTs = append(startTs, row.StartTs) + } // If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE. // NOTICE: Only update events with the old value feature enabled will have both columns and preColumns. @@ -805,10 +804,11 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, bucket int) *prep flushCacheDMLs() dmls := &preparedDMLs{ - sqls: sqls, - values: values, + startTs: startTs, + sqls: sqls, + values: values, + rowCount: rowCount, } - dmls.rowCount = rowCount return dmls } diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index 0920ef56450..11de1654d82 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -60,8 +60,12 @@ func TestPrepareDML(t *testing.T) { expected *preparedDMLs }{ { - input: []*model.RowChangedEvent{}, - expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}}, + input: []*model.RowChangedEvent{}, + expected: &preparedDMLs{ + startTs: []model.Ts{}, + sqls: []string{}, + values: [][]interface{}{}, + }, }, { input: []*model.RowChangedEvent{ { @@ -83,6 +87,7 @@ func TestPrepareDML(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813514}, sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"}, values: [][]interface{}{{1, 1}}, rowCount: 1, @@ -108,6 +113,7 @@ func TestPrepareDML(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"}, values: [][]interface{}{{2, 2}}, rowCount: 1, @@ -2163,9 +2169,13 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { expected *preparedDMLs }{ { - name: "empty", - input: []*model.RowChangedEvent{}, - expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}}, + name: "empty", + input: []*model.RowChangedEvent{}, + expected: &preparedDMLs{ + startTs: []model.Ts{}, + sqls: []string{}, + values: [][]interface{}{}, + }, }, { name: "insert without PK", input: []*model.RowChangedEvent{ @@ -2188,6 +2198,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813514}, sqls: []string{ "INSERT INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);", }, @@ -2216,6 +2227,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813514}, sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"}, values: [][]interface{}{{1, 1}}, rowCount: 1, @@ -2253,6 +2265,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, sqls: []string{ "UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " + "WHERE `a1`=? AND `a3`=? LIMIT 1;", @@ -2293,6 +2306,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " + "WHERE `a1`=? AND `a3`=? LIMIT 1;"}, values: [][]interface{}{{3, 3, 2, 2}}, @@ -2337,6 +2351,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, sqls: []string{ "INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", "INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", @@ -2366,6 +2381,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516}, sqls: []string{ "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", }, @@ -2393,9 +2409,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }}, }, { - StartTs: 418658114257813516, - CommitTs: 418658114257813517, - ReplicatingTs: 418658114257813515, + StartTs: 418658114257813506, + CommitTs: 418658114257813507, + ReplicatingTs: 418658114257813505, Table: &model.TableName{Schema: "common_1", Table: "pk"}, Columns: []*model.Column{nil, { Name: "a1", @@ -2411,6 +2427,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) { }, }, expected: &preparedDMLs{ + startTs: []model.Ts{418658114257813516, 418658114257813506}, sqls: []string{ "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);", "REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",