Skip to content

Commit

Permalink
*(ticdc): split old update kv entry after restarting changefeed (#10919)
Browse files Browse the repository at this point in the history
close #10918
  • Loading branch information
lidezhu authored May 6, 2024
1 parent be15534 commit c710066
Show file tree
Hide file tree
Showing 25 changed files with 1,004 additions and 106 deletions.
5 changes: 5 additions & 0 deletions cdc/model/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type RawKVEntry struct {
RegionID uint64 `msg:"region_id"`
}

// IsUpdate checks if the event is an update event.
func (v *RawKVEntry) IsUpdate() bool {
return v.OpType == OpTypePut && v.OldValue != nil && v.Value != nil
}

func (v *RawKVEntry) String() string {
// TODO: redact values.
return fmt.Sprintf(
Expand Down
47 changes: 35 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,30 @@ type RowChangedEventInRedoLog struct {
IndexColumns [][]int `msg:"index-columns"`
}

// ToRowChangedEvent converts RowChangedEventInRedoLog to RowChangedEvent
func (r *RowChangedEventInRedoLog) ToRowChangedEvent() *RowChangedEvent {
cols := r.Columns
if cols == nil {
cols = r.PreColumns
}
tableInfo := BuildTableInfo(
r.Table.Schema,
r.Table.Table,
cols,
r.IndexColumns)
tableInfo.TableName.TableID = r.Table.TableID
tableInfo.TableName.IsPartition = r.Table.IsPartition
row := &RowChangedEvent{
StartTs: r.StartTs,
CommitTs: r.CommitTs,
PhysicalTableID: r.Table.TableID,
TableInfo: tableInfo,
Columns: Columns2ColumnDatas(r.Columns, tableInfo),
PreColumns: Columns2ColumnDatas(r.PreColumns, tableInfo),
}
return row
}

// txnRows represents a set of events that belong to the same transaction.
type txnRows []*RowChangedEvent

Expand Down Expand Up @@ -1125,18 +1149,17 @@ func (t *SingleTableTxn) TrySplitAndSortUpdateEvent(scheme string) error {

// Whether split a single update event into delete and insert events?
//
// For the MySQL Sink, there is no need to split a single unique key changed update event, this
// is also to keep the backward compatibility, the same behavior as before.
// For the MySQL Sink, we don't split any update event.
// This may cause error like "duplicate entry" when sink to the downstream.
// This kind of error will cause the changefeed to restart,
// and then the related update rows will be splitted to insert and delete at puller side.
//
// For the Kafka and Storage sink, always split a single unique key changed update event, since:
// 1. Avro and CSV does not output the previous column values for the update event, so it would
// cause consumer missing data if the unique key changed event is not split.
// 2. Index-Value Dispatcher cannot work correctly if the unique key changed event isn't split.
func (t *SingleTableTxn) shouldSplitUpdateEvent(sinkScheme string) bool {
if len(t.Rows) < 2 && sink.IsMySQLCompatibleScheme(sinkScheme) {
return false
}
return true
return !sink.IsMySQLCompatibleScheme(sinkScheme)
}

// trySplitAndSortUpdateEvent try to split update events if unique key is updated
Expand Down Expand Up @@ -1166,8 +1189,8 @@ func trySplitAndSortUpdateEvent(

// This indicates that it is an update event. if the pk or uk is updated,
// we need to split it into two events (delete and insert).
if e.IsUpdate() && shouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := splitUpdateEvent(e)
if e.IsUpdate() && ShouldSplitUpdateEvent(e) {
deleteEvent, insertEvent, err := SplitUpdateEvent(e)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -1192,10 +1215,10 @@ func isNonEmptyUniqueOrHandleCol(col *ColumnData, tableInfo *TableInfo) bool {
return false
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// ShouldSplitUpdateEvent determines if the split event is needed to align the old format based on
// whether the handle key column or unique key has been modified.
// If is modified, we need to use splitUpdateEvent to split the update event into a delete and an insert event.
func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
func ShouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
// nil event will never be split.
if updateEvent == nil {
return false
Expand All @@ -1217,8 +1240,8 @@ func shouldSplitUpdateEvent(updateEvent *RowChangedEvent) bool {
return false
}

// splitUpdateEvent splits an update event into a delete and an insert event.
func splitUpdateEvent(
// SplitUpdateEvent splits an update event into a delete and an insert event.
func SplitUpdateEvent(
updateEvent *RowChangedEvent,
) (*RowChangedEvent, *RowChangedEvent, error) {
if updateEvent == nil {
Expand Down
9 changes: 8 additions & 1 deletion cdc/model/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func TestTrySplitAndSortUpdateEvent(t *testing.T) {
require.Equal(t, 1, len(result))
}

func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
func TestTxnTrySplitAndSortUpdateEvent(t *testing.T) {
columns := []*Column{
{
Name: "col1",
Expand Down Expand Up @@ -614,6 +614,13 @@ func TestTrySplitAndSortUpdateEventOne(t *testing.T) {
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn.Rows, 1)

txn2 := &SingleTableTxn{
Rows: []*RowChangedEvent{ukUpdatedEvent, ukUpdatedEvent},
}
err = txn.TrySplitAndSortUpdateEvent(sink.MySQLScheme)
require.NoError(t, err)
require.Len(t, txn2.Rows, 2)
}

func TestToRedoLog(t *testing.T) {
Expand Down
21 changes: 20 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io"
"net/url"
"strconv"
"sync"
"time"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -486,6 +488,18 @@ func isProcessorIgnorableError(err error) bool {
return false
}

// needPullerSafeModeAtStart returns true if the scheme is mysql compatible.
// pullerSafeMode means to split all update kv entries whose commitTS
// is older then the start time of this changefeed.
func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// Tick implements the `orchestrator.State` interface
// the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot.
// the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot.
Expand Down Expand Up @@ -645,10 +659,15 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
return errors.Trace(err)
}

pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor))
util.GetOrZero(cfConfig.EnableTableMonitor),
pullerSafeModeAtStart)
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)
Expand Down
5 changes: 5 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("cost", time.Since(start)))

// For duplicate entry error, we fast fail to restart changefeed.
if cerror.IsDupEntryError(err) {
return errors.Trace(err)
}
}

// If the error is retryable, we should retry to re-establish the internal resources.
Expand Down
66 changes: 63 additions & 3 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/kv"
Expand All @@ -27,9 +28,13 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/pkg/config"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

Expand All @@ -50,6 +55,7 @@ type SourceManager struct {
engine sorter.SortEngine
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool
startTs model.Ts

enableTableMonitor bool
puller *puller.MultiplexingPuller
Expand All @@ -63,8 +69,9 @@ func New(
engine sorter.SortEngine,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor)
return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor, safeModeAtStart)
}

// NewForTest creates a new source manager for testing.
Expand All @@ -85,13 +92,31 @@ func NewForTest(
}
}

func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool {
return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs
}

func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) {
if raw == nil {
return nil, nil, errors.New("nil event cannot be split")
}
deleteKVEntry := *raw
deleteKVEntry.Value = nil

insertKVEntry := *raw
insertKVEntry.OldValue = nil

return &deleteKVEntry, &insertKVEntry, nil
}

func newSourceManager(
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
mg entry.MounterGroup,
engine sorter.SortEngine,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
mgr := &SourceManager{
ready: make(chan struct{}),
Expand Down Expand Up @@ -120,8 +145,18 @@ func newSourceManager(
zap.String("changefeed", mgr.changefeedID.ID))
}
if raw != nil {
pEvent := model.NewPolymorphicEvent(raw)
mgr.engine.Add(spans[0], pEvent)
if safeModeAtStart && isOldUpdateKVEntry(raw, mgr.startTs) {
deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw)
if err != nil {
return err
}
deleteEvent := model.NewPolymorphicEvent(deleteKVEntry)
insertEvent := model.NewPolymorphicEvent(insertKVEntry)
mgr.engine.Add(spans[0], deleteEvent, insertEvent)
} else {
pEvent := model.NewPolymorphicEvent(raw)
mgr.engine.Add(spans[0], pEvent)
}
}
return nil
}
Expand Down Expand Up @@ -192,6 +227,11 @@ func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error {
if m.puller == nil {
return nil
}
startTs, err := getCurrentTs(ctx, m.up.PDClient)
if err != nil {
return err
}
m.startTs = startTs
return m.puller.Run(ctx)
}

Expand Down Expand Up @@ -233,3 +273,23 @@ func (m *SourceManager) Close() {
func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) {
m.engine.Add(span, events...)
}

func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) {
backoffBaseDelayInMs := int64(100)
totalRetryDuration := 10 * time.Second
var replicateTs model.Ts
err := retry.Do(ctx, func() error {
phy, logic, err := pdClient.GetTS(ctx)
if err != nil {
return errors.Trace(err)
}
replicateTs = oracle.ComposeTS(phy, logic)
return nil
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs),
retry.WithTotalRetryDuratoin(totalRetryDuration),
retry.WithIsRetryableErr(cerrors.IsRetryableError))
if err != nil {
return model.Ts(0), errors.Trace(err)
}
return replicateTs, nil
}
21 changes: 1 addition & 20 deletions cdc/redo/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,26 +240,7 @@ func (l *LogReader) ReadNextRow(ctx context.Context) (*model.RowChangedEvent, er
return nil, errors.Trace(ctx.Err())
case rowInRedoLog := <-l.rowCh:
if rowInRedoLog != nil {
cols := rowInRedoLog.Columns
if cols == nil {
cols = rowInRedoLog.PreColumns
}
tableInfo := model.BuildTableInfo(
rowInRedoLog.Table.Schema,
rowInRedoLog.Table.Table,
cols,
rowInRedoLog.IndexColumns)
tableInfo.TableName.TableID = rowInRedoLog.Table.TableID
tableInfo.TableName.IsPartition = rowInRedoLog.Table.IsPartition
row := &model.RowChangedEvent{
StartTs: rowInRedoLog.StartTs,
CommitTs: rowInRedoLog.CommitTs,
PhysicalTableID: rowInRedoLog.Table.TableID,
TableInfo: tableInfo,
Columns: model.Columns2ColumnDatas(rowInRedoLog.Columns, tableInfo),
PreColumns: model.Columns2ColumnDatas(rowInRedoLog.PreColumns, tableInfo),
}
return row, nil
return rowInRedoLog.ToRowChangedEvent(), nil
}
return nil, nil
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type SinkFactory struct {
category Category
}

// New creates a new SinkFactory by schema.
// New creates a new SinkFactory by scheme.
func New(
ctx context.Context,
changefeedID model.ChangeFeedID,
Expand All @@ -79,8 +79,8 @@ func New(
}

s := &SinkFactory{}
schema := sink.GetScheme(sinkURI)
switch schema {
scheme := sink.GetScheme(sinkURI)
switch scheme {
case sink.MySQLScheme, sink.MySQLSSLScheme, sink.TiDBScheme, sink.TiDBSSLScheme:
txnSink, err := txn.NewMySQLSink(ctx, changefeedID, sinkURI, cfg, errCh,
txn.DefaultConflictDetectorSlots)
Expand Down Expand Up @@ -123,7 +123,7 @@ func New(
s.category = CategoryMQ
default:
return nil,
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", schema)
cerror.ErrSinkURIInvalid.GenWithStack("the sink scheme (%s) is not supported", scheme)
}

return s, nil
Expand Down
Loading

0 comments on commit c710066

Please sign in to comment.