From 8830dc82a62c1198a2d9d8af4ddbf961ac80d3e3 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 8 Jan 2025 20:08:38 +0800 Subject: [PATCH] codec(ticdc): decoder set the table id directly (#11978) close pingcap/tiflow#11986 --- cmd/kafka-consumer/writer.go | 41 +---- cmd/pulsar-consumer/main.go | 147 +++++------------- pkg/sink/codec/avro/avro_test.go | 2 +- pkg/sink/codec/avro/decoder.go | 13 +- pkg/sink/codec/canal/canal_json_decoder.go | 68 +++++--- pkg/sink/codec/canal/canal_json_message.go | 74 ++++++++- .../canal_json_row_event_encoder_test.go | 129 +++++++++++---- .../canal/canal_json_txn_event_decoder.go | 2 +- pkg/sink/codec/common/helper.go | 35 +++++ pkg/sink/codec/open/open_protocol_decoder.go | 18 ++- pkg/sink/codec/open/open_protocol_message.go | 12 +- 11 files changed, 328 insertions(+), 213 deletions(-) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 766d22b3702..412eac16025 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -17,7 +17,6 @@ import ( "context" "database/sql" "errors" - "fmt" "math" "sync" "time" @@ -32,7 +31,6 @@ import ( "github.com/pingcap/tiflow/cdc/sink/tablesink" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/avro" "github.com/pingcap/tiflow/pkg/sink/codec/canal" @@ -117,10 +115,9 @@ func (p *partitionProgress) loadWatermark() uint64 { type writer struct { option *option - ddlList []*model.DDLEvent - ddlWithMaxCommitTs *model.DDLEvent - ddlSink ddlsink.Sink - fakeTableIDGenerator *fakeTableIDGenerator + ddlList []*model.DDLEvent + ddlWithMaxCommitTs *model.DDLEvent + ddlSink ddlsink.Sink // sinkFactory is used to create table sink for each table. sinkFactory *eventsinkfactory.SinkFactory @@ -131,10 +128,7 @@ type writer struct { func newWriter(ctx context.Context, o *option) *writer { w := &writer{ - option: o, - fakeTableIDGenerator: &fakeTableIDGenerator{ - tableIDs: make(map[string]int64), - }, + option: o, progresses: make([]*partitionProgress, o.partitionNum), } var ( @@ -353,9 +347,8 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool cachedEvents := dec.GetCachedEvents() for _, row := range cachedEvents { w.checkPartition(row, partition, message.TopicPartition.Offset) - tableID := row.GetTableID() log.Info("simple protocol cached event resolved, append to the group", - zap.Int64("tableID", tableID), zap.Uint64("commitTs", row.CommitTs), + zap.Int64("tableID", row.GetTableID()), zap.Uint64("commitTs", row.CommitTs), zap.Int32("partition", partition), zap.Any("offset", offset)) w.appendRow2Group(row, progress, offset) } @@ -384,15 +377,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool continue } w.checkPartition(row, partition, message.TopicPartition.Offset) - - tableID := row.GetTableID() - switch w.option.protocol { - case config.ProtocolSimple, config.ProtocolCanalJSON: - default: - tableID = w.fakeTableIDGenerator. - generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID) - row.PhysicalTableID = tableID - } w.appendRow2Group(row, progress, offset) case model.MessageTypeResolved: newWatermark, err := progress.decoder.NextResolvedEvent() @@ -515,21 +499,6 @@ func (w *writer) appendRow2Group(row *model.RowChangedEvent, progress *partition group.Append(row, offset) } -type fakeTableIDGenerator struct { - tableIDs map[string]int64 - currentTableID int64 -} - -func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, tableID int64) int64 { - key := fmt.Sprintf("`%s`.`%s`.`%d`", quotes.EscapeName(schema), quotes.EscapeName(table), tableID) - if tableID, ok := g.tableIDs[key]; ok { - return tableID - } - g.currentTableID++ - g.tableIDs[key] = g.currentTableID - return g.currentTableID -} - func syncFlushRowChangedEvents(ctx context.Context, progress *partitionProgress, watermark uint64) { resolvedTs := model.NewResolvedTs(watermark) for { diff --git a/cmd/pulsar-consumer/main.go b/cmd/pulsar-consumer/main.go index 0de8fdf4e17..1159b044097 100644 --- a/cmd/pulsar-consumer/main.go +++ b/cmd/pulsar-consumer/main.go @@ -41,7 +41,6 @@ import ( cmdUtil "github.com/pingcap/tiflow/pkg/cmd/util" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/logutil" - "github.com/pingcap/tiflow/pkg/quotes" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/codec" "github.com/pingcap/tiflow/pkg/sink/codec/canal" @@ -85,19 +84,17 @@ type ConsumerOption struct { func newConsumerOption() *ConsumerOption { return &ConsumerOption{ - protocol: config.ProtocolDefault, + protocol: config.ProtocolCanalJSON, + // the default value of partitionNum is 1 + partitionNum: 1, } } // Adjust the consumer option by the upstream uri passed in parameters. func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { - // the default value of partitionNum is 1 - o.partitionNum = 1 - o.topic = strings.TrimFunc(upstreamURI.Path, func(r rune) bool { return r == '/' }) - o.address = strings.Split(upstreamURI.Host, ",") replicaConfig := config.GetDefaultReplicaConfig() @@ -115,12 +112,12 @@ func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { if err != nil { log.Panic("invalid protocol", zap.Error(err), zap.String("protocol", s)) } - if !sutil.IsPulsarSupportedProtocols(protocol) { - log.Panic("unsupported protocol, pulsar sink currently only support these protocols: [canal-json, canal, maxwell]", - zap.String("protocol", s)) - } o.protocol = protocol } + if !sutil.IsPulsarSupportedProtocols(o.protocol) { + log.Panic("unsupported protocol, pulsar sink currently only support these protocols: [canal-json]", + zap.String("protocol", s)) + } s = upstreamURI.Query().Get("enable-tidb-extension") if s != "" { @@ -128,11 +125,6 @@ func (o *ConsumerOption) Adjust(upstreamURI *url.URL, configFile string) { if err != nil { log.Panic("invalid enable-tidb-extension of upstream-uri") } - if enableTiDBExtension { - if o.protocol != config.ProtocolCanalJSON && o.protocol != config.ProtocolAvro { - log.Panic("enable-tidb-extension only work with canal-json / avro") - } - } o.enableTiDBExtension = enableTiDBExtension } @@ -177,14 +169,11 @@ func main() { } } -func run(cmd *cobra.Command, args []string) { +func run(_ *cobra.Command, _ []string) { err := logutil.InitLogger(&logutil.Config{ Level: consumerOption.logLevel, File: consumerOption.logPath, - }, - logutil.WithInitGRPCLogger(), - logutil.WithInitSaramaLogger(), - ) + }) if err != nil { log.Error("init logger failed", zap.Error(err)) return @@ -216,7 +205,7 @@ func run(cmd *cobra.Command, args []string) { defer pulsarConsumer.Close() msgChan := pulsarConsumer.Chan() - wg := &sync.WaitGroup{} + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() @@ -229,7 +218,7 @@ func run(cmd *cobra.Command, args []string) { log.Debug(fmt.Sprintf("Received message msgId: %#v -- content: '%s'\n", consumerMsg.ID(), string(consumerMsg.Payload()))) - err := consumer.HandleMsg(consumerMsg.Message) + err = consumer.HandleMsg(consumerMsg.Message) if err != nil { log.Panic("Error consuming message", zap.Error(err)) } @@ -244,7 +233,7 @@ func run(cmd *cobra.Command, args []string) { wg.Add(1) go func() { defer wg.Done() - if err := consumer.Run(ctx); err != nil { + if err = consumer.Run(ctx); err != nil { if err != context.Canceled { log.Panic("Error running consumer", zap.Error(err)) } @@ -328,6 +317,8 @@ func NewPulsarConsumer(option *ConsumerOption) (pulsar.Consumer, pulsar.Client) // partitionSinks maintained for each partition, it may sync data for multiple tables. type partitionSinks struct { + decoder codec.RowEventDecoder + tablesCommitTsMap sync.Map tableSinksMap sync.Map // resolvedTs record the maximum timestamp of the received event @@ -336,12 +327,11 @@ type partitionSinks struct { // Consumer represents a local pulsar consumer type Consumer struct { - eventGroups map[int64]*eventsGroup - ddlList []*model.DDLEvent - ddlListMu sync.Mutex - lastReceivedDDL *model.DDLEvent - ddlSink ddlsink.Sink - fakeTableIDGenerator *fakeTableIDGenerator + eventGroups map[int64]*eventsGroup + ddlList []*model.DDLEvent + ddlListMu sync.Mutex + lastReceivedDDL *model.DDLEvent + ddlSink ddlsink.Sink // sinkFactory is used to create table sink for each table. sinkFactory *eventsinkfactory.SinkFactory @@ -372,23 +362,21 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) { config.GetGlobalServerConfig().TZ = o.timezone c.tz = tz - c.fakeTableIDGenerator = &fakeTableIDGenerator{ - tableIDs: make(map[string]int64), - } - c.codecConfig = common.NewConfig(o.protocol) c.codecConfig.EnableTiDBExtension = o.enableTiDBExtension - if c.codecConfig.Protocol == config.ProtocolAvro { - c.codecConfig.AvroEnableWatermark = true + decoder, err := canal.NewBatchDecoder(ctx, c.codecConfig, nil) + if err != nil { + return nil, errors.Trace(err) } - c.sinks = make([]*partitionSinks, o.partitionNum) - ctx, cancel := context.WithCancel(ctx) - errChan := make(chan error, 1) for i := 0; i < o.partitionNum; i++ { - c.sinks[i] = &partitionSinks{} + c.sinks[i] = &partitionSinks{ + decoder: decoder, + } } + ctx, cancel := context.WithCancel(ctx) + errChan := make(chan error, 1) changefeedID := model.DefaultChangeFeedID("pulsar-consumer") f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, o.replicaConfig, errChan, nil) if err != nil { @@ -450,29 +438,8 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error { c.sinksMu.Lock() sink := c.sinks[0] c.sinksMu.Unlock() - if sink == nil { - panic("sink should initialized") - } - - ctx := context.Background() - var ( - decoder codec.RowEventDecoder - err error - ) - - switch c.codecConfig.Protocol { - case config.ProtocolCanalJSON: - decoder, err = canal.NewBatchDecoder(ctx, c.codecConfig, nil) - if err != nil { - return err - } - default: - log.Panic("Protocol not supported", zap.Any("Protocol", c.codecConfig.Protocol)) - } - if err != nil { - return errors.Trace(err) - } + decoder := sink.decoder if err := decoder.AddKeyValue([]byte(msg.Key()), msg.Payload()); err != nil { log.Error("add key value to the decoder failed", zap.Error(err)) return errors.Trace(err) @@ -503,7 +470,6 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error { zap.ByteString("value", msg.Payload()), zap.Error(err)) } - log.Info("DDL event received", zap.Any("DDL", ddl)) c.appendDDL(ddl) case model.MessageTypeRow: row, err := decoder.NextRowChangedEvent() @@ -525,21 +491,18 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error { continue } tableID := row.GetTableID() - // use schema, table and tableID to identify a table - switch c.option.protocol { - case config.ProtocolCanalJSON: - default: - tableID := c.fakeTableIDGenerator. - generateFakeTableID(row.TableInfo.GetSchemaName(), row.TableInfo.GetTableName(), tableID) - row.PhysicalTableID = tableID - } - group, ok := c.eventGroups[tableID] if !ok { group = newEventsGroup() c.eventGroups[tableID] = group } group.Append(row) + log.Info("DML event received", + zap.Int64("tableID", row.GetTableID()), + zap.String("schema", row.TableInfo.GetSchemaName()), + zap.String("table", row.TableInfo.GetTableName()), + zap.Uint64("commitTs", row.CommitTs), + zap.Any("columns", row.Columns), zap.Any("preColumns", row.PreColumns)) case model.MessageTypeResolved: ts, err := decoder.NextResolvedEvent() if err != nil { @@ -585,6 +548,7 @@ func (c *Consumer) HandleMsg(msg pulsar.Message) error { } } atomic.StoreUint64(&sink.resolvedTs, ts) + log.Info("resolved ts updated", zap.Uint64("resolvedTs", ts)) } } @@ -601,7 +565,7 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { log.Panic("DDL CommitTs < lastReceivedDDL.CommitTs", zap.Uint64("commitTs", ddl.CommitTs), zap.Uint64("lastReceivedDDLCommitTs", c.lastReceivedDDL.CommitTs), - zap.Any("DDL", ddl)) + zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) } // A rename tables DDL job contains multiple DDL events with same CommitTs. @@ -609,12 +573,12 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { // the current DDL and the DDL with max CommitTs. if ddl == c.lastReceivedDDL { log.Info("ignore redundant DDL, the DDL is equal to ddlWithMaxCommitTs", - zap.Any("DDL", ddl)) + zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) return } c.ddlList = append(c.ddlList, ddl) - log.Info("DDL event received", zap.Any("DDL", ddl)) + log.Info("DDL event received", zap.Uint64("commitTs", ddl.CommitTs), zap.String("DDL", ddl.Query)) c.lastReceivedDDL = ddl } @@ -650,16 +614,16 @@ func (c *Consumer) forEachSink(fn func(sink *partitionSinks) error) error { } // getMinResolvedTs returns the minimum resolvedTs of all the partitionSinks -func (c *Consumer) getMinResolvedTs() (result uint64, err error) { - result = uint64(math.MaxUint64) - err = c.forEachSink(func(sink *partitionSinks) error { +func (c *Consumer) getMinResolvedTs() uint64 { + result := uint64(math.MaxUint64) + _ = c.forEachSink(func(sink *partitionSinks) error { a := atomic.LoadUint64(&sink.resolvedTs) if a < result { result = a } return nil }) - return result, err + return result } // Run the Consumer @@ -671,11 +635,7 @@ func (c *Consumer) Run(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - // 1. Get the minimum resolvedTs of all the partitionSinks - minResolvedTs, err := c.getMinResolvedTs() - if err != nil { - return errors.Trace(err) - } + minResolvedTs := c.getMinResolvedTs() // 2. check if there is a DDL event that can be executed // if there is, execute it and update the minResolvedTs @@ -753,24 +713,3 @@ func flushRowChangedEvents(ctx context.Context, sink *partitionSinks, resolvedTs } } } - -type fakeTableIDGenerator struct { - tableIDs map[string]int64 - currentTableID int64 - mu sync.Mutex -} - -func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partition int64) int64 { - g.mu.Lock() - defer g.mu.Unlock() - key := quotes.QuoteSchema(schema, table) - if partition != 0 { - key = fmt.Sprintf("%s.`%d`", key, partition) - } - if tableID, ok := g.tableIDs[key]; ok { - return tableID - } - g.currentTableID++ - g.tableIDs[key] = g.currentTableID - return g.currentTableID -} diff --git a/pkg/sink/codec/avro/avro_test.go b/pkg/sink/codec/avro/avro_test.go index 69723e5e58f..70d87e4b757 100644 --- a/pkg/sink/codec/avro/avro_test.go +++ b/pkg/sink/codec/avro/avro_test.go @@ -76,7 +76,7 @@ func TestDMLEventE2E(t *testing.T) { decodedEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) require.NotNil(t, decodedEvent) - require.Equal(t, decodedEvent.GetTableID(), int64(0)) + require.NotZero(t, decodedEvent.GetTableID()) TeardownEncoderAndSchemaRegistry4Testing() } diff --git a/pkg/sink/codec/avro/decoder.go b/pkg/sink/codec/avro/decoder.go index 2bce9e6e0f2..a8474e238f8 100644 --- a/pkg/sink/codec/avro/decoder.go +++ b/pkg/sink/codec/avro/decoder.go @@ -37,7 +37,8 @@ type decoder struct { config *common.Config topic string - upstreamTiDB *sql.DB + upstreamTiDB *sql.DB + tableIDAllocator *common.FakeTableIDAllocator schemaM SchemaManager @@ -53,10 +54,11 @@ func NewDecoder( db *sql.DB, ) codec.RowEventDecoder { return &decoder{ - config: config, - topic: topic, - schemaM: schemaM, - upstreamTiDB: db, + config: config, + topic: topic, + schemaM: schemaM, + upstreamTiDB: db, + tableIDAllocator: common.NewFakeTableIDAllocator(), } } @@ -124,6 +126,7 @@ func (d *decoder) NextRowChangedEvent() (*model.RowChangedEvent, error) { if err != nil { return nil, errors.Trace(err) } + event.PhysicalTableID = d.tableIDAllocator.AllocateTableID(event.TableInfo.GetSchemaName(), event.TableInfo.GetTableName()) // Delete event only has Primary Key Columns, but the checksum is calculated based on the whole row columns, // checksum verification cannot be done here, so skip it. diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index bc3ebd9544d..e46d45a8238 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -26,7 +26,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/pkg/ddl" + "github.com/pingcap/tidb/pkg/meta/metabuild" timodel "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tiflow/cdc/model" @@ -91,7 +95,10 @@ type batchDecoder struct { upstreamTiDB *sql.DB bytesDecoder *encoding.Decoder - tableInfoCache map[tableKey]*model.TableInfo + tableInfoCache map[tableKey]*model.TableInfo + partitionInfoCache map[tableKey]*timodel.PartitionInfo + + tableIDAllocator *common.FakeTableIDAllocator } // NewBatchDecoder return a decoder for canal-json @@ -115,22 +122,15 @@ func NewBatchDecoder( GenWithStack("handle-key-only is enabled, but upstream TiDB is not provided") } - var msg canalJSONMessageInterface = &JSONMessage{} - if codecConfig.EnableTiDBExtension { - msg = &canalJSONMessageWithTiDBExtension{ - JSONMessage: &JSONMessage{}, - Extensions: &tidbExtension{}, - } - } - return &batchDecoder{ - config: codecConfig, - msg: msg, - decoder: newBufferedJSONDecoder(), - storage: externalStorage, - upstreamTiDB: db, - bytesDecoder: charmap.ISO8859_1.NewDecoder(), - tableInfoCache: make(map[tableKey]*model.TableInfo), + config: codecConfig, + decoder: newBufferedJSONDecoder(), + storage: externalStorage, + upstreamTiDB: db, + bytesDecoder: charmap.ISO8859_1.NewDecoder(), + tableInfoCache: make(map[tableKey]*model.TableInfo), + partitionInfoCache: make(map[tableKey]*timodel.PartitionInfo), + tableIDAllocator: common.NewFakeTableIDAllocator(), }, nil } @@ -156,11 +156,20 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) { return model.MessageTypeUnknown, false, nil } - if err := b.decoder.Decode(b.msg); err != nil { + var msg canalJSONMessageInterface = &JSONMessage{} + if b.config.EnableTiDBExtension { + msg = &canalJSONMessageWithTiDBExtension{ + JSONMessage: &JSONMessage{}, + Extensions: &tidbExtension{}, + } + } + + if err := b.decoder.Decode(msg); err != nil { log.Error("canal-json decoder decode failed", zap.Error(err), zap.ByteString("data", b.decoder.Bytes())) return model.MessageTypeUnknown, false, err } + b.msg = msg return b.msg.messageType(), true, nil } @@ -370,14 +379,31 @@ func (b *batchDecoder) NextDDLEvent() (*model.DDLEvent, error) { result := canalJSONMessage2DDLEvent(b.msg) schema := *b.msg.getSchema() table := *b.msg.getTable() + cacheKey := tableKey{ + schema: schema, + table: table, + } // if receive a table level DDL, just remove the table info to trigger create a new one. if schema != "" && table != "" { - cacheKey := tableKey{ - schema: schema, - table: table, - } delete(b.tableInfoCache, cacheKey) + delete(b.partitionInfoCache, cacheKey) + } + + stmt, err := parser.New().ParseOneStmt(result.Query, "", "") + if err != nil { + return nil, errors.Trace(err) + } + if v, ok := stmt.(*ast.CreateTableStmt); ok { + tableInfo, err := ddl.BuildTableInfoFromAST(metabuild.NewContext(), v) + if err != nil { + return nil, errors.Trace(err) + } + partitions := tableInfo.GetPartitionInfo() + if partitions != nil { + b.partitionInfoCache[cacheKey] = partitions + } } + return result, nil } diff --git a/pkg/sink/codec/canal/canal_json_message.go b/pkg/sink/codec/canal/canal_json_message.go index d65e47fb29e..fda1b3768e2 100644 --- a/pkg/sink/codec/canal/canal_json_message.go +++ b/pkg/sink/codec/canal/canal_json_message.go @@ -14,6 +14,7 @@ package canal import ( + "fmt" "strconv" "strings" @@ -187,19 +188,29 @@ func (c *canalJSONMessageWithTiDBExtension) isPartition() bool { } func (b *batchDecoder) queryTableInfo(msg canalJSONMessageInterface) *model.TableInfo { + schema := *msg.getSchema() + table := *msg.getTable() cacheKey := tableKey{ - schema: *msg.getSchema(), - table: *msg.getTable(), + schema: schema, + table: table, } tableInfo, ok := b.tableInfoCache[cacheKey] if !ok { - tableInfo = newTableInfo(msg) + partitionInfo := b.partitionInfoCache[cacheKey] + tableInfo = newTableInfo(msg, partitionInfo) + tableInfo.ID = b.tableIDAllocator.AllocateTableID(schema, table) + if tableInfo.Partition != nil { + for idx, partition := range tableInfo.Partition.Definitions { + partitionID := b.tableIDAllocator.AllocatePartitionID(schema, table, partition.Name.O) + tableInfo.Partition.Definitions[idx].ID = partitionID + } + } b.tableInfoCache[cacheKey] = tableInfo } return tableInfo } -func newTableInfo(msg canalJSONMessageInterface) *model.TableInfo { +func newTableInfo(msg canalJSONMessageInterface, partitionInfo *timodel.PartitionInfo) *model.TableInfo { schema := *msg.getSchema() table := *msg.getTable() tidbTableInfo := &timodel.TableInfo{} @@ -210,15 +221,59 @@ func newTableInfo(msg canalJSONMessageInterface) *model.TableInfo { mysqlType := msg.getMySQLType() setColumnInfos(tidbTableInfo, rawColumns, mysqlType, pkNames) setIndexes(tidbTableInfo, pkNames) + tidbTableInfo.Partition = partitionInfo return model.WrapTableInfo(100, schema, 1000, tidbTableInfo) } +func (b *batchDecoder) setPhysicalTableID(event *model.RowChangedEvent, physicalTableID int64) error { + if physicalTableID != 0 { + event.PhysicalTableID = physicalTableID + return nil + } + if event.TableInfo.Partition == nil { + event.PhysicalTableID = event.TableInfo.ID + return nil + } + switch event.TableInfo.Partition.Type { + case pmodel.PartitionTypeRange: + targetColumnID := event.TableInfo.ForceGetColumnIDByName(strings.ReplaceAll(event.TableInfo.Partition.Expr, "`", "")) + columns := event.Columns + if columns == nil { + columns = event.PreColumns + } + var columnValue string + for _, col := range columns { + if col.ColumnID == targetColumnID { + columnValue = model.ColumnValueString(col.Value) + break + } + } + for _, partition := range event.TableInfo.Partition.Definitions { + if partition.LessThan[0] == "MAXVALUE" { + event.PhysicalTableID = partition.ID + return nil + } + if strings.Compare(columnValue, partition.LessThan[0]) == -1 { + event.PhysicalTableID = partition.ID + return nil + } + } + return fmt.Errorf("cannot found partition for column value %s", columnValue) + // todo: support following rule if meet the corresponding workload + case pmodel.PartitionTypeHash: + case pmodel.PartitionTypeKey: + case pmodel.PartitionTypeList: + case pmodel.PartitionTypeNone: + default: + } + return fmt.Errorf("manually set partition id for partition type %s not supported yet", event.TableInfo.Partition.Type) +} + func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) { msg := b.msg result := new(model.RowChangedEvent) result.TableInfo = b.queryTableInfo(msg) result.CommitTs = msg.getCommitTs() - result.PhysicalTableID = msg.getPhysicalTableID() mysqlType := msg.getMySQLType() result.TableInfo.TableName.IsPartition = msg.isPartition() result.TableInfo.TableName.TableID = msg.getTableID() @@ -230,6 +285,10 @@ func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, err if err != nil { return nil, err } + err = b.setPhysicalTableID(result, msg.getPhysicalTableID()) + if err != nil { + return nil, err + } return result, nil } @@ -264,7 +323,10 @@ func (b *batchDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, err log.Panic("column count mismatch", zap.Any("preCols", preCols), zap.Any("cols", result.Columns)) } } - + err = b.setPhysicalTableID(result, msg.getPhysicalTableID()) + if err != nil { + return nil, err + } return result, nil } diff --git a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go index 99233fd658f..dd7f891a0dd 100644 --- a/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go +++ b/pkg/sink/codec/canal/canal_json_row_event_encoder_test.go @@ -42,23 +42,20 @@ func TestBuildCanalJSONRowEventEncoder(t *testing.T) { } func TestDMLE2E(t *testing.T) { - _, insertEvent, updateEvent, deleteEvent := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) + createTableDDLEvent, insertEvent, updateEvent, deleteEvent := utils.NewLargeEvent4Test(t, config.GetDefaultReplicaConfig()) ctx := context.Background() codecConfig := common.NewConfig(config.ProtocolCanalJSON) - for _, enableTiDBExtension := range []bool{true, false} { + for _, enableTiDBExtension := range []bool{false, true} { codecConfig.EnableTiDBExtension = enableTiDBExtension builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) require.NoError(t, err) - encoder := builder.Build() - err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) require.NoError(t, err) - message := encoder.Build()[0] - - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + message, err := encoder.EncodeDDLEvent(createTableDDLEvent) require.NoError(t, err) err = decoder.AddKeyValue(message.Key, message.Value) @@ -67,6 +64,25 @@ func TestDMLE2E(t *testing.T) { messageType, hasNext, err := decoder.HasNext() require.NoError(t, err) require.True(t, hasNext) + require.Equal(t, messageType, model.MessageTypeDDL) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + if enableTiDBExtension { + require.Equal(t, createTableDDLEvent.CommitTs, decodedDDL.CommitTs) + } + require.Equal(t, createTableDDLEvent.Query, decodedDDL.Query) + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, func() {}) + require.NoError(t, err) + + message = encoder.Build()[0] + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + + messageType, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) require.Equal(t, messageType, model.MessageTypeRow) decodedEvent, err := decoder.NextRowChangedEvent() @@ -76,7 +92,9 @@ func TestDMLE2E(t *testing.T) { if enableTiDBExtension { require.Equal(t, insertEvent.CommitTs, decodedEvent.CommitTs) require.Equal(t, insertEvent.GetTableID(), decodedEvent.GetTableID()) - require.Equal(t, insertEvent.TableInfo.IsPartitionTable(), decodedEvent.TableInfo.IsPartitionTable()) + } else { + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.ID) } require.Equal(t, insertEvent.TableInfo.GetSchemaName(), decodedEvent.TableInfo.GetSchemaName()) require.Equal(t, insertEvent.TableInfo.GetTableName(), decodedEvent.TableInfo.GetTableName()) @@ -734,17 +752,6 @@ func TestE2EPartitionTable(t *testing.T) { helper := entry.NewSchemaTestHelper(t) defer helper.Close() - ctx := context.Background() - codecConfig := common.NewConfig(config.ProtocolCanalJSON) - codecConfig.EnableTiDBExtension = true - - builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) - require.NoError(t, err) - encoder := builder.Build() - - decoder, err := NewBatchDecoder(ctx, codecConfig, nil) - require.NoError(t, err) - helper.Tk().MustExec("use test") createPartitionTableDDL := helper.DDL2Event(`create table test.t(a int primary key, b int) partition by range (a) ( @@ -762,24 +769,94 @@ func TestE2EPartitionTable(t *testing.T) { insertEvent2 := helper.DML2Event(`insert into test.t values (21, 21)`, "test", "t", "p2") require.NotNil(t, insertEvent2) - events := []*model.RowChangedEvent{insertEvent, insertEvent1, insertEvent2} + ctx := context.Background() + codecConfig := common.NewConfig(config.ProtocolCanalJSON) + for _, enableTiDBExtension := range []bool{false, true} { + codecConfig.EnableTiDBExtension = enableTiDBExtension - for _, event := range events { - err = encoder.AppendRowChangedEvent(ctx, "", event, nil) + builder, err := NewJSONRowEventEncoderBuilder(ctx, codecConfig) + require.NoError(t, err) + encoder := builder.Build() + + decoder, err := NewBatchDecoder(ctx, codecConfig, nil) + require.NoError(t, err) + + message, err := encoder.EncodeDDLEvent(createPartitionTableDDL) require.NoError(t, err) - message := encoder.Build()[0] err = decoder.AddKeyValue(message.Key, message.Value) require.NoError(t, err) + tp, hasNext, err := decoder.HasNext() require.NoError(t, err) require.True(t, hasNext) + require.Equal(t, model.MessageTypeDDL, tp) + + decodedDDL, err := decoder.NextDDLEvent() + require.NoError(t, err) + require.NotNil(t, decodedDDL) + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent, nil) + require.NoError(t, err) + message = encoder.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) require.Equal(t, model.MessageTypeRow, tp) decodedEvent, err := decoder.NextRowChangedEvent() require.NoError(t, err) - require.Equal(t, decodedEvent.GetTableID(), event.GetTableID()) - require.Equal(t, decodedEvent.TableInfo.TableName.TableID, event.TableInfo.TableName.TableID) - require.Equal(t, decodedEvent.TableInfo.IsPartitionTable(), event.TableInfo.IsPartitionTable()) + + if enableTiDBExtension { + require.Equal(t, decodedEvent.GetTableID(), insertEvent.GetTableID()) + } else { + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[0].ID) + } + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent1, nil) + require.NoError(t, err) + message = encoder.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err = decoder.NextRowChangedEvent() + require.NoError(t, err) + + if enableTiDBExtension { + require.Equal(t, decodedEvent.GetTableID(), insertEvent1.GetTableID()) + } else { + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[1].ID) + } + + err = encoder.AppendRowChangedEvent(ctx, "", insertEvent2, nil) + require.NoError(t, err) + message = encoder.Build()[0] + + err = decoder.AddKeyValue(message.Key, message.Value) + require.NoError(t, err) + tp, hasNext, err = decoder.HasNext() + require.NoError(t, err) + require.True(t, hasNext) + require.Equal(t, model.MessageTypeRow, tp) + + decodedEvent, err = decoder.NextRowChangedEvent() + require.NoError(t, err) + + if enableTiDBExtension { + require.Equal(t, decodedEvent.GetTableID(), insertEvent2.GetTableID()) + } else { + require.NotZero(t, decodedEvent.GetTableID()) + require.Equal(t, decodedEvent.GetTableID(), decodedEvent.TableInfo.GetPartitionInfo().Definitions[2].ID) + } } } diff --git a/pkg/sink/codec/canal/canal_json_txn_event_decoder.go b/pkg/sink/codec/canal/canal_json_txn_event_decoder.go index de537720107..cf0b326084d 100644 --- a/pkg/sink/codec/canal/canal_json_txn_event_decoder.go +++ b/pkg/sink/codec/canal/canal_json_txn_event_decoder.go @@ -113,7 +113,7 @@ func (d *canalJSONTxnEventDecoder) NextRowChangedEvent() (*model.RowChangedEvent func (d *canalJSONTxnEventDecoder) canalJSONMessage2RowChange() (*model.RowChangedEvent, error) { msg := d.msg result := new(model.RowChangedEvent) - result.TableInfo = newTableInfo(msg) + result.TableInfo = newTableInfo(msg, nil) result.CommitTs = msg.getCommitTs() result.PhysicalTableID = msg.getPhysicalTableID() diff --git a/pkg/sink/codec/common/helper.go b/pkg/sink/codec/common/helper.go index 3257fee5a5c..b42599a7c4f 100644 --- a/pkg/sink/codec/common/helper.go +++ b/pkg/sink/codec/common/helper.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/quotes" "go.uber.org/zap" ) @@ -387,3 +388,37 @@ func UnsafeBytesToString(b []byte) string { func UnsafeStringToBytes(s string) []byte { return *(*[]byte)(unsafe.Pointer(&s)) } + +// FakeTableIDAllocator is a fake table id allocator +type FakeTableIDAllocator struct { + tableIDs map[string]int64 + currentTableID int64 +} + +// NewFakeTableIDAllocator creates a new FakeTableIDAllocator +func NewFakeTableIDAllocator() *FakeTableIDAllocator { + return &FakeTableIDAllocator{ + tableIDs: make(map[string]int64), + } +} + +func (g *FakeTableIDAllocator) allocateByKey(key string) int64 { + if tableID, ok := g.tableIDs[key]; ok { + return tableID + } + g.currentTableID++ + g.tableIDs[key] = g.currentTableID + return g.currentTableID +} + +// AllocateTableID allocates a table id +func (g *FakeTableIDAllocator) AllocateTableID(schema, table string) int64 { + key := fmt.Sprintf("`%s`.`%s`", quotes.EscapeName(schema), quotes.EscapeName(table)) + return g.allocateByKey(key) +} + +// AllocatePartitionID allocates a partition id +func (g *FakeTableIDAllocator) AllocatePartitionID(schema, table, name string) int64 { + key := fmt.Sprintf("`%s`.`%s`.`%s`", quotes.EscapeName(schema), quotes.EscapeName(table), quotes.EscapeName(name)) + return g.allocateByKey(key) +} diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index fb31e7e660c..9089093cbb4 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -48,6 +48,8 @@ type BatchDecoder struct { config *common.Config upstreamTiDB *sql.DB + + tableIDAllocator *common.FakeTableIDAllocator } // NewBatchDecoder creates a new BatchDecoder. @@ -70,9 +72,10 @@ func NewBatchDecoder(ctx context.Context, config *common.Config, db *sql.DB) (co } return &BatchDecoder{ - config: config, - storage: externalStorage, - upstreamTiDB: db, + config: config, + storage: externalStorage, + upstreamTiDB: db, + tableIDAllocator: common.NewFakeTableIDAllocator(), }, nil } @@ -146,11 +149,10 @@ func (b *BatchDecoder) HasNext() (model.MessageType, bool, error) { return model.MessageTypeUnknown, false, cerror.ErrOpenProtocolCodecInvalidData. GenWithStack("decompress data failed") } - - if err := rowMsg.decode(value); err != nil { + if err = rowMsg.decode(value); err != nil { return b.nextKey.Type, false, errors.Trace(err) } - b.nextEvent = msgToRowChange(b.nextKey, rowMsg) + b.nextEvent = b.msgToRowChange(b.nextKey, rowMsg) } return b.nextKey.Type, true, nil @@ -184,7 +186,7 @@ func (b *BatchDecoder) NextDDLEvent() (*model.DDLEvent, error) { } ddlMsg := new(messageDDL) - if err := ddlMsg.decode(value); err != nil { + if err = ddlMsg.decode(value); err != nil { return nil, errors.Trace(err) } ddlEvent := msgToDDLEvent(b.nextKey, ddlMsg) @@ -344,7 +346,7 @@ func (b *BatchDecoder) assembleEventFromClaimCheckStorage(ctx context.Context) ( return nil, errors.Trace(err) } - event := msgToRowChange(msgKey, rowMsg) + event := b.msgToRowChange(msgKey, rowMsg) return event, nil } diff --git a/pkg/sink/codec/open/open_protocol_message.go b/pkg/sink/codec/open/open_protocol_message.go index d9124c2063e..250dad2017f 100644 --- a/pkg/sink/codec/open/open_protocol_message.go +++ b/pkg/sink/codec/open/open_protocol_message.go @@ -166,7 +166,7 @@ func rowChangeToMsg( return key, value, nil } -func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChangedEvent { +func (b *BatchDecoder) msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChangedEvent { e := new(model.RowChangedEvent) // TODO: we lost the startTs from kafka message // startTs-based txn filter is out of work @@ -193,6 +193,8 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang if key.Partition != nil { e.PhysicalTableID = *key.Partition e.TableInfo.TableName.IsPartition = true + } else { + e.PhysicalTableID = b.tableIDAllocator.AllocateTableID(key.Schema, key.Table) } return e @@ -201,10 +203,10 @@ func msgToRowChange(key *internal.MessageKey, value *messageRow) *model.RowChang func rowChangeColumns2CodecColumns(cols []*model.ColumnData, tb *model.TableInfo, onlyHandleKeyColumns bool) map[string]internal.Column { jsonCols := make(map[string]internal.Column, len(cols)) for _, col := range cols { - colx := model.GetColumnDataX(col, tb) - if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { - continue - } + colx := model.GetColumnDataX(col, tb) + if colx.ColumnData == nil || onlyHandleKeyColumns && !colx.GetFlag().IsHandleKey() { + continue + } c := internal.Column{} c.FromRowChangeColumn(colx) jsonCols[colx.GetName()] = c