Skip to content

Commit

Permalink
sink(ticdc): add start ts to mysql log and refine get tso (#6462)
Browse files Browse the repository at this point in the history
close #6460
  • Loading branch information
overvenus authored Jul 26, 2022
1 parent bd0cc36 commit 7a6f9f0
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 34 deletions.
17 changes: 15 additions & 2 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pipeline"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
Expand Down Expand Up @@ -191,15 +192,27 @@ func (n *sorterNode) start(
case <-stdCtx.Done():
return nil
case startTs = <-n.startTsCh:
phy, logic, err := n.pdClient.GetTS(ctx)
backoffBaseDelayInMs := int64(100)
totalRetryDuration := 10 * time.Second
start := time.Now()
err := retry.Do(stdCtx, func() error {
phy, logic, err := n.pdClient.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
replicateTs = oracle.ComposeTS(phy, logic)
return nil
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithTotalRetryDuratoin(totalRetryDuration),
retry.WithIsRetryableErr(cerror.IsRetryableError))
if err != nil {
return errors.Trace(err)
}
replicateTs = oracle.ComposeTS(phy, logic)
log.Info("table is replicating",
zap.Int64("tableID", n.tableID),
zap.String("tableName", n.tableName),
zap.Uint64("replicateTs", replicateTs),
zap.Duration("duration", time.Since(start)),
zap.String("namespace", n.changefeed.Namespace),
zap.String("changefeed", n.changefeed.ID))
}
Expand Down
48 changes: 24 additions & 24 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,15 @@ func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEve
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
if errorutil.IsIgnorableMySQLDDLError(err) {
log.Info("execute DDL failed, but error can be ignored", zap.String("query", ddl.Query), zap.Error(err))
log.Info("execute DDL failed, but error can be ignored",
zap.Uint64("startTs", ddl.StartTs), zap.String("ddl", ddl.Query),
zap.Error(err))
return nil
}
if err != nil {
log.Warn("execute DDL with error, retry later", zap.String("query", ddl.Query), zap.Error(err))
log.Warn("execute DDL with error, retry later",
zap.Uint64("startTs", ddl.StartTs), zap.String("ddl", ddl.Query),
zap.Error(err))
}
return err
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
Expand Down Expand Up @@ -599,12 +603,14 @@ func (s *mysqlSink) getTableResolvedTs(tableID model.TableID) (model.ResolvedTs,
}

func logDMLTxnErr(
err error, start time.Time, changefeed model.ChangeFeedID, query string, count int,
err error, start time.Time, changefeed model.ChangeFeedID,
query string, count int, startTs []model.Ts,
) error {
if isRetryableDMLError(err) {
log.Warn("execute DMLs with error, retry later",
zap.Error(err), zap.Duration("duration", time.Since(start)),
zap.String("query", query), zap.Int("count", count),
zap.Uint64s("startTs", startTs),
zap.String("namespace", changefeed.Namespace),
zap.String("changefeed", changefeed.ID))
} else {
Expand Down Expand Up @@ -642,7 +648,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML
failpoint.Return(
logDMLTxnErr(
errors.Trace(driver.ErrBadConn),
start, s.params.changefeedID, "failpoint", 0))
start, s.params.changefeedID, "failpoint", 0, nil))
})
failpoint.Inject("MySQLSinkHangLongTime", func() {
time.Sleep(time.Hour)
Expand All @@ -652,7 +658,7 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML
if err != nil {
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, "BEGIN", dmls.rowCount)
start, s.params.changefeedID, "BEGIN", dmls.rowCount, dmls.startTs)
}

for i, query := range dmls.sqls {
Expand All @@ -663,30 +669,18 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML
log.Warn("failed to rollback txn", zap.Error(err))
_ = logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount)
start, s.params.changefeedID, query, dmls.rowCount, dmls.startTs)
}
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, query, dmls.rowCount)
}
}

if len(dmls.markSQL) != 0 {
log.Debug("exec row", zap.String("sql", dmls.markSQL))
if _, err := tx.ExecContext(ctx, dmls.markSQL); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
log.Warn("failed to rollback txn", zap.Error(err))
}
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, dmls.markSQL, dmls.rowCount)
start, s.params.changefeedID, query, dmls.rowCount, dmls.startTs)
}
}

if err = tx.Commit(); err != nil {
return 0, logDMLTxnErr(
cerror.WrapError(cerror.ErrMySQLTxnError, err),
start, s.params.changefeedID, "COMMIT", dmls.rowCount)
start, s.params.changefeedID, "COMMIT", dmls.rowCount, dmls.startTs)
}
return dmls.rowCount, nil
})
Expand All @@ -706,14 +700,15 @@ func (s *mysqlSink) execDMLWithMaxRetries(ctx context.Context, dmls *preparedDML
}

type preparedDMLs struct {
startTs []model.Ts
sqls []string
values [][]interface{}
markSQL string
rowCount int
}

// prepareDMLs converts model.RowChangedEvent list to query string list and args list
func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, bucket int) *preparedDMLs {
startTs := make([]model.Ts, 0, 1)
sqls := make([]string, 0, len(rows))
values := make([][]interface{}, 0, len(rows))
replaces := make(map[string][][]interface{})
Expand Down Expand Up @@ -744,6 +739,10 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, bucket int) *prep
var query string
var args []interface{}
quoteTable := quotes.QuoteSchema(row.Table.Schema, row.Table.Table)
if len(startTs) == 0 || // Always add the first row's start ts.
startTs[len(startTs)-1] != row.StartTs { // Try to deduplicate starts ts.
startTs = append(startTs, row.StartTs)
}

// If the old value is enabled, is not in safe mode and is an update event, then translate to UPDATE.
// NOTICE: Only update events with the old value feature enabled will have both columns and preColumns.
Expand Down Expand Up @@ -805,10 +804,11 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, bucket int) *prep
flushCacheDMLs()

dmls := &preparedDMLs{
sqls: sqls,
values: values,
startTs: startTs,
sqls: sqls,
values: values,
rowCount: rowCount,
}
dmls.rowCount = rowCount
return dmls
}

Expand Down
33 changes: 25 additions & 8 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ func TestPrepareDML(t *testing.T) {
expected *preparedDMLs
}{
{
input: []*model.RowChangedEvent{},
expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}},
input: []*model.RowChangedEvent{},
expected: &preparedDMLs{
startTs: []model.Ts{},
sqls: []string{},
values: [][]interface{}{},
},
}, {
input: []*model.RowChangedEvent{
{
Expand All @@ -83,6 +87,7 @@ func TestPrepareDML(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{"DELETE FROM `common_1`.`uk_without_pk` WHERE `a1` = ? AND `a3` = ? LIMIT 1;"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
Expand All @@ -108,6 +113,7 @@ func TestPrepareDML(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"REPLACE INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{2, 2}},
rowCount: 1,
Expand Down Expand Up @@ -2163,9 +2169,13 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
expected *preparedDMLs
}{
{
name: "empty",
input: []*model.RowChangedEvent{},
expected: &preparedDMLs{sqls: []string{}, values: [][]interface{}{}},
name: "empty",
input: []*model.RowChangedEvent{},
expected: &preparedDMLs{
startTs: []model.Ts{},
sqls: []string{},
values: [][]interface{}{},
},
}, {
name: "insert without PK",
input: []*model.RowChangedEvent{
Expand All @@ -2188,6 +2198,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{
"INSERT INTO `common_1`.`uk_without_pk`(`a1`,`a3`) VALUES (?,?);",
},
Expand Down Expand Up @@ -2216,6 +2227,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813514},
sqls: []string{"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);"},
values: [][]interface{}{{1, 1}},
rowCount: 1,
Expand Down Expand Up @@ -2253,6 +2265,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{
"UPDATE `common_1`.`uk_without_pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1;",
Expand Down Expand Up @@ -2293,6 +2306,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{"UPDATE `common_1`.`pk` SET `a1`=?,`a3`=? " +
"WHERE `a1`=? AND `a3`=? LIMIT 1;"},
values: [][]interface{}{{3, 3, 2, 2}},
Expand Down Expand Up @@ -2337,6 +2351,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{
"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
"INSERT INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
Expand Down Expand Up @@ -2366,6 +2381,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516},
sqls: []string{
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
},
Expand Down Expand Up @@ -2393,9 +2409,9 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
}},
},
{
StartTs: 418658114257813516,
CommitTs: 418658114257813517,
ReplicatingTs: 418658114257813515,
StartTs: 418658114257813506,
CommitTs: 418658114257813507,
ReplicatingTs: 418658114257813505,
Table: &model.TableName{Schema: "common_1", Table: "pk"},
Columns: []*model.Column{nil, {
Name: "a1",
Expand All @@ -2411,6 +2427,7 @@ func TestMysqlSinkSafeModeOff(t *testing.T) {
},
},
expected: &preparedDMLs{
startTs: []model.Ts{418658114257813516, 418658114257813506},
sqls: []string{
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
"REPLACE INTO `common_1`.`pk`(`a1`,`a3`) VALUES (?,?);",
Expand Down

0 comments on commit 7a6f9f0

Please sign in to comment.