diff --git a/cdc/model/sink.go b/cdc/model/sink.go index f19b20754c8..d35004229aa 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -259,7 +259,6 @@ type RowChangedEvent struct { TableInfoVersion uint64 `json:"table-info-version,omitempty" msg:"table-info-version"` - ReplicaID uint64 `json:"replica-id" msg:"replica-id"` Columns []*Column `json:"columns" msg:"-"` PreColumns []*Column `json:"pre-columns" msg:"-"` IndexColumns [][]int `json:"-" msg:"index-columns"` diff --git a/cdc/model/sink_gen.go b/cdc/model/sink_gen.go index 153285792a7..5ebb68dd85f 100644 --- a/cdc/model/sink_gen.go +++ b/cdc/model/sink_gen.go @@ -2103,12 +2103,6 @@ func (z *RowChangedEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "TableInfoVersion") return } - case "replica-id": - z.ReplicaID, err = dc.ReadUint64() - if err != nil { - err = msgp.WrapError(err, "ReplicaID") - return - } case "index-columns": var zb0002 uint32 zb0002, err = dc.ReadArrayHeader() @@ -2154,9 +2148,9 @@ func (z *RowChangedEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *RowChangedEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 6 + // map header, size 5 // write "start-ts" - err = en.Append(0x86, 0xa8, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2d, 0x74, 0x73) + err = en.Append(0x85, 0xa8, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2d, 0x74, 0x73) if err != nil { return } @@ -2202,16 +2196,6 @@ func (z *RowChangedEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "TableInfoVersion") return } - // write "replica-id" - err = en.Append(0xaa, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x2d, 0x69, 0x64) - if err != nil { - return - } - err = en.WriteUint64(z.ReplicaID) - if err != nil { - err = msgp.WrapError(err, "ReplicaID") - return - } // write "index-columns" err = en.Append(0xad, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2d, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73) if err != nil { @@ -2242,9 +2226,9 @@ func (z *RowChangedEvent) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *RowChangedEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 6 + // map header, size 5 // string "start-ts" - o = append(o, 0x86, 0xa8, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2d, 0x74, 0x73) + o = append(o, 0x85, 0xa8, 0x73, 0x74, 0x61, 0x72, 0x74, 0x2d, 0x74, 0x73) o = msgp.AppendUint64(o, z.StartTs) // string "commit-ts" o = append(o, 0xa9, 0x63, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x2d, 0x74, 0x73) @@ -2263,9 +2247,6 @@ func (z *RowChangedEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "table-info-version" o = append(o, 0xb2, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x2d, 0x69, 0x6e, 0x66, 0x6f, 0x2d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e) o = msgp.AppendUint64(o, z.TableInfoVersion) - // string "replica-id" - o = append(o, 0xaa, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x2d, 0x69, 0x64) - o = msgp.AppendUint64(o, z.ReplicaID) // string "index-columns" o = append(o, 0xad, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x2d, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73) o = msgp.AppendArrayHeader(o, uint32(len(z.IndexColumns))) @@ -2331,12 +2312,6 @@ func (z *RowChangedEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "TableInfoVersion") return } - case "replica-id": - z.ReplicaID, bts, err = msgp.ReadUint64Bytes(bts) - if err != nil { - err = msgp.WrapError(err, "ReplicaID") - return - } case "index-columns": var zb0002 uint32 zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) @@ -2389,7 +2364,7 @@ func (z *RowChangedEvent) Msgsize() (s int) { } else { s += z.Table.Msgsize() } - s += 19 + msgp.Uint64Size + 11 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize + s += 19 + msgp.Uint64Size + 14 + msgp.ArrayHeaderSize for za0001 := range z.IndexColumns { s += msgp.ArrayHeaderSize + (len(z.IndexColumns[za0001]) * (msgp.IntSize)) } diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 0acf5c63940..5597baff961 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -37,10 +37,9 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) { var txn *model.SingleTableTxn if len(t.txns) == 0 || row.SplitTxn || t.txns[len(t.txns)-1].StartTs < row.StartTs { txn = &model.SingleTableTxn{ - StartTs: row.StartTs, - CommitTs: row.CommitTs, - Table: row.Table, - ReplicaID: row.ReplicaID, + StartTs: row.StartTs, + CommitTs: row.CommitTs, + Table: row.Table, } t.txns = append(t.txns, txn) } else if t.txns[len(t.txns)-1].StartTs == row.StartTs { diff --git a/cdc/sinkv2/eventsink/event_appender.go b/cdc/sinkv2/eventsink/event_appender.go index 9dd6bdade2e..a57b13af5db 100644 --- a/cdc/sinkv2/eventsink/event_appender.go +++ b/cdc/sinkv2/eventsink/event_appender.go @@ -59,10 +59,9 @@ func (t *TxnEventAppender) Append( // This means no txn is in the buffer. if len(buffer) == 0 { txn := &model.SingleTableTxn{ - StartTs: row.StartTs, - CommitTs: row.CommitTs, - Table: row.Table, - ReplicaID: row.ReplicaID, + StartTs: row.StartTs, + CommitTs: row.CommitTs, + Table: row.Table, } txn.Append(row) buffer = append(buffer, txn) @@ -91,10 +90,9 @@ func (t *TxnEventAppender) Append( } buffer = append(buffer, &model.SingleTableTxn{ - StartTs: row.StartTs, - CommitTs: row.CommitTs, - Table: row.Table, - ReplicaID: row.ReplicaID, + StartTs: row.StartTs, + CommitTs: row.CommitTs, + Table: row.Table, }) }